gnunet-go

GNUnet Bindings for Go
Log | Files | Refs | README | LICENSE

commit 835c8e8b45487c1276426034b5afb915853b6eb1
parent f425c2aeef06d1a6105678c8b058bdde65a26e78
Author: Bernd Fix <brf@hoi-polloi.org>
Date:   Mon, 25 Jul 2022 13:43:57 +0200

RC1 for milestone 2 (NGI Assure)

Diffstat:
Msrc/gnunet/cmd/gnunet-service-dht-go/main.go | 8++++++++
Msrc/gnunet/config/config.go | 17+++++++++--------
Msrc/gnunet/core/core.go | 32+++++++++++++++++---------------
Msrc/gnunet/core/peer.go | 4+---
Msrc/gnunet/crypto/hash.go | 28++++++++++++++++++++--------
Msrc/gnunet/crypto/signature.go | 11+++++++++++
Msrc/gnunet/go.mod | 4+++-
Msrc/gnunet/go.sum | 4++--
Msrc/gnunet/message/msg_dht_p2p.go | 279+++++++++++++++++++++++++++++++++++++++++--------------------------------------
Msrc/gnunet/service/dht/blocks/filters.go | 80+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------
Msrc/gnunet/service/dht/blocks/filters_test.go | 24++++++++++++++++++++++++
Msrc/gnunet/service/dht/blocks/generic.go | 44++++++++++++++++++++++++++------------------
Msrc/gnunet/service/dht/blocks/gns.go | 6+++---
Msrc/gnunet/service/dht/blocks/hello.go | 10++++++----
Asrc/gnunet/service/dht/local.go | 86+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/gnunet/service/dht/messages.go | 211++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------
Msrc/gnunet/service/dht/module.go | 147+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------
Asrc/gnunet/service/dht/path/elements.go | 129+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/gnunet/service/dht/path/handling.go | 263+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/gnunet/service/dht/path/handling_test.go | 133+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/gnunet/service/dht/resulthandler.go | 59+++++++++++++++++++++++++++++++++++++++++++++++------------
Msrc/gnunet/service/dht/routingtable.go | 126++++++++++++++++++++++++++++++++++++++++++++++---------------------------------
Msrc/gnunet/service/dht/routingtable_test.go | 8++++----
Msrc/gnunet/service/module.go | 10++++++++++
Msrc/gnunet/service/namecache/module.go | 10+++++-----
Msrc/gnunet/service/store/database.go | 12++++++------
Msrc/gnunet/service/store/dhtstore_test.go | 32++++++++++++++++++++++++--------
Msrc/gnunet/service/store/store.go | 9++++++++-
Msrc/gnunet/service/store/store_fs.go | 119+++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------
Msrc/gnunet/service/store/store_fs_meta.go | 17++++++++++-------
Msrc/gnunet/transport/endpoint.go | 10++++------
Msrc/gnunet/transport/reader_writer.go | 32++------------------------------
Msrc/gnunet/transport/transport.go | 25+++++++++++++++++++++----
Msrc/gnunet/util/address.go | 12++++++------
Msrc/gnunet/util/map.go | 107++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------
Msrc/gnunet/util/misc.go | 24++++++++++++++++++++++++
Msrc/gnunet/util/peer.go | 75++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------
Asrc/gnunet/util/peer_test.go | 38++++++++++++++++++++++++++++++++++++++
38 files changed, 1720 insertions(+), 525 deletions(-)

diff --git a/src/gnunet/cmd/gnunet-service-dht-go/main.go b/src/gnunet/cmd/gnunet-service-dht-go/main.go @@ -23,6 +23,7 @@ import ( "flag" "os" "os/signal" + "runtime" "strings" "syscall" "time" @@ -185,6 +186,13 @@ loop: // handle heart beat case now := <-tick.C: logger.Println(logger.INFO, "[dht] Heart beat at "+now.String()) + // print some system statistics + logger.Printf(logger.INFO, "[dht] Number of Go routines: %15d", runtime.NumGoroutine()) + mem := new(runtime.MemStats) + runtime.ReadMemStats(mem) + logger.Printf(logger.INFO, "[dht] Allocated heap: %15d", mem.HeapAlloc) + logger.Printf(logger.INFO, "[dht] Idle heap: %15d", mem.HeapIdle) + logger.Printf(logger.INFO, "[dht] Total allocation: %15d", mem.TotalAlloc) } } diff --git a/src/gnunet/config/config.go b/src/gnunet/config/config.go @@ -212,15 +212,16 @@ func applySubstitutions(x interface{}, env map[string]string) { switch fld.Kind() { case reflect.String: // check for substitution - s, _ := fld.Interface().(string) - for { - s1 := substString(s, env) - if s1 == s { - break + if s, ok := fld.Interface().(string); ok { + for { + s1 := substString(s, env) + if s1 == s { + break + } + logger.Printf(logger.DBG, "[config] %s --> %s\n", s, s1) + fld.SetString(s1) + s = s1 } - logger.Printf(logger.DBG, "[config] %s --> %s\n", s, s1) - fld.SetString(s1) - s = s1 } case reflect.Struct: diff --git a/src/gnunet/core/core.go b/src/gnunet/core/core.go @@ -23,6 +23,7 @@ import ( "encoding/hex" "errors" "gnunet/config" + "gnunet/crypto" "gnunet/message" "gnunet/transport" "gnunet/util" @@ -165,13 +166,13 @@ func (c *Core) pump(ctx context.Context) { select { // get (next) message from transport case tm := <-c.incoming: - logger.Printf(logger.DBG, "[core] Message received from %s: %s", tm.Peer, transport.Dump(tm.Msg, "json")) + logger.Printf(logger.DBG, "[core] Message received from %s: %s", tm.Peer, util.Dump(tm.Msg, "json")) // check if peer is already connected (has an entry in PeerAddrist) - _, connected := c.connected.Get(tm.Peer.String()) + _, connected := c.connected.Get(tm.Peer.String(), 0) if !connected { // no: mark connected - c.connected.Put(tm.Peer.String(), true) + c.connected.Put(tm.Peer.String(), true, 0) // generate EV_CONNECT event c.dispatch(&Event{ ID: EV_CONNECT, @@ -259,6 +260,11 @@ func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addrs []*util.Addre // learn all addresses for peer newPeer = false for _, addr := range addrs { + // filter out addresses we can't handle (including local addresses) + if !transport.CanHandleAddress(addr) { + continue + } + // learn address logger.Printf(logger.INFO, "[core] Learning %s for %s (expires %s)", addr.URI(), peer, addr.Expires) newPeer = (c.peers.Add(peer, addr) == 1) || newPeer } @@ -287,17 +293,8 @@ func (c *Core) PeerID() *util.PeerID { //---------------------------------------------------------------------- -// Signable interface for objects that can get signed by peer -type Signable interface { - // SignedData returns the byte array to be signed - SignedData() []byte - - // SetSignature returns the signature to the signable object - SetSignature(*util.PeerSignature) error -} - // Sign a signable onject with private peer key -func (c *Core) Sign(obj Signable) error { +func (c *Core) Sign(obj crypto.Signable) error { sd := obj.SignedData() logger.Printf(logger.DBG, "[core] Signing data '%s'", hex.EncodeToString(sd)) sig, err := c.local.prv.EdSign(sd) @@ -315,6 +312,7 @@ func (c *Core) Sign(obj Signable) error { // When the connection attempt is successful, information on the new // peer is offered through the PEER_CONNECTED signal. func (c *Core) TryConnect(peer *util.PeerID, addr net.Addr) error { + // TODO: return nil } @@ -322,7 +320,9 @@ func (c *Core) TryConnect(peer *util.PeerID, addr net.Addr) error { // connection to a peer P. Underlays are usually limited in the number // of active connections. With this function the DHT can indicate to the // underlay which connections should preferably be preserved. -func (c *Core) Hold(peer *util.PeerID) {} +func (c *Core) Hold(peer *util.PeerID) { + // TODO: +} // Drop is a function which tells the underlay to drop the connection to a // peer P. This function is only there for symmetry and used during the @@ -333,7 +333,9 @@ func (c *Core) Hold(peer *util.PeerID) {} // DROP() also does not imply that the underlay must close the connection: // it merely removes the preference to preserve the connection that was // established by HOLD(). -func (c *Core) Drop(peer *util.PeerID) {} +func (c *Core) Drop(peer *util.PeerID) { + // TODO: +} //---------------------------------------------------------------------- // Event listener and event dispatch. diff --git a/src/gnunet/core/peer.go b/src/gnunet/core/peer.go @@ -143,9 +143,7 @@ func (p *Peer) PubKey() *ed25519.PublicKey { // GetID returns the node ID (public key) in binary format func (p *Peer) GetID() *util.PeerID { - return &util.PeerID{ - Data: util.Clone(p.pub.Bytes()), - } + return util.NewPeerID(p.pub.Bytes()) } // GetIDString returns the string representation of the public key of the node. diff --git a/src/gnunet/crypto/hash.go b/src/gnunet/crypto/hash.go @@ -28,7 +28,7 @@ import ( // HashCode is the result of a 512-bit hash function (SHA-512) type HashCode struct { - Bits []byte `size:"64"` + Bits []byte `size:"(Size))"` } // Equals tests if two hash results are equal. @@ -36,9 +36,16 @@ func (hc *HashCode) Equals(n *HashCode) bool { return bytes.Equal(hc.Bits, n.Bits) } +// Size of binary data +func (hc *HashCode) Size() uint { + return 64 +} + // Clone the hash code func (hc *HashCode) Clone() *HashCode { - return NewHashCode(hc.Bits) + return &HashCode{ + Bits: util.Clone(hc.Bits), + } } // String returns a hex-representation of the hash code @@ -47,13 +54,18 @@ func (hc *HashCode) String() string { } // NewHashCode creates a new (initialized) hash value -func NewHashCode(buf []byte) *HashCode { - hc := &HashCode{ - Bits: make([]byte, 64), - } - if buf != nil { - util.CopyAlignedBlock(hc.Bits, buf) +func NewHashCode(data []byte) *HashCode { + hc := new(HashCode) + size := hc.Size() + v := make([]byte, size) + if data != nil && len(data) > 0 { + if uint(len(data)) < size { + util.CopyAlignedBlock(v, data) + } else { + copy(v, data[:size]) + } } + hc.Bits = v return hc } diff --git a/src/gnunet/crypto/signature.go b/src/gnunet/crypto/signature.go @@ -18,8 +18,19 @@ package crypto +import "gnunet/util" + // SignaturePurpose is the GNUnet data structure used as header for signed data. type SignaturePurpose struct { Size uint32 `order:"big"` // How many bytes are signed? Purpose uint32 `order:"big"` // Signature purpose } + +// Signable interface for objects that can get signed by peer +type Signable interface { + // SignedData returns the byte array to be signed + SignedData() []byte + + // SetSignature returns the signature to the signable object + SetSignature(*util.PeerSignature) error +} diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod @@ -3,7 +3,7 @@ module gnunet go 1.18 require ( - github.com/bfix/gospel v1.2.15 + github.com/bfix/gospel v1.2.17 github.com/go-redis/redis/v8 v8.11.5 github.com/go-sql-driver/mysql v1.6.0 github.com/gorilla/mux v1.8.0 @@ -23,3 +23,5 @@ require ( golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.11 // indirect ) + +// replace github.com/bfix/gospel v1.2.17 => ../gospel diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum @@ -1,5 +1,5 @@ -github.com/bfix/gospel v1.2.15 h1:f0t8dvihSXWvhnXDI2q7FCtG7LHg5qImjEWdzIN/luY= -github.com/bfix/gospel v1.2.15/go.mod h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI= +github.com/bfix/gospel v1.2.17 h1:Stvm+OiCA2GIWIhI/HKc6uaLDMtrJNxXgw/g+v9witw= +github.com/bfix/gospel v1.2.17/go.mod h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= diff --git a/src/gnunet/message/msg_dht_p2p.go b/src/gnunet/message/msg_dht_p2p.go @@ -27,11 +27,11 @@ import ( "gnunet/crypto" "gnunet/enums" "gnunet/service/dht/blocks" + "gnunet/service/dht/path" "gnunet/util" "time" "github.com/bfix/gospel/crypto/ed25519" - "github.com/bfix/gospel/data" "github.com/bfix/gospel/logger" ) @@ -39,90 +39,6 @@ import ( // DHT-P2P is a next-generation implementation of the R5N DHT. //====================================================================== -// shared path element data across types -type pathElementData struct { - Expiration util.AbsoluteTime // expiration date - BlockHash *crypto.HashCode // block hash - PeerPredecessor *util.PeerID // predecessor peer - PeerSuccessor *util.PeerID // successor peer -} - -// helper type for signature creation/verification -type pathElementSignedData struct { - Size uint16 `order:"big"` // size of signed data - Purpose uint16 `order:"big"` // signature purpose (SIG_DHT_HOP) - Elem *pathElementData `` // path element data -} - -// PathElement is the full-fledged data assembly for a path element in -// PUT/GET pathes. It is assembled programatically (on generation[1] and -// verification[2]) and not transferred in messages directly. -// -// [1] spe = &PathElement{...} -// core.Sign(spe) -// msg.putpath[i] = spe.Wire() -// -// [2] pe = &PathElement{...,Signature: wire.sig} -// if !pe.Verify(peerId) { ... } -// -type PathElement struct { - pathElementData - Signature *util.PeerSignature // signature -} - -// NewPathElement creates a new path element from data -func NewPathElement(key *crypto.HashCode, pred, succ *util.PeerID) *PathElement { - return &PathElement{ - pathElementData: pathElementData{ - Expiration: util.AbsoluteTimeNow().Add(12 * time.Hour), - BlockHash: key, - PeerPredecessor: pred, - PeerSuccessor: succ, - }, - Signature: nil, - } -} - -// PathElementWire is the data stored and retrieved from messages -type PathElementWire struct { - Predecessor *util.PeerID // peer id of predecessor - Signature *util.PeerSignature // path signature -} - -// Size returns the size of a path element in wire format -func (pew *PathElementWire) Size() uint16 { - return 96 -} - -// SignedData gets the data to be signed by peer ('Signable' interface) -func (pe *PathElement) SignedData() []byte { - sd := &pathElementSignedData{ - Size: 80, - Purpose: uint16(enums.SIG_DHT_HOP), - Elem: &(pe.pathElementData), - } - buf, err := data.Marshal(sd) - if err != nil { - logger.Println(logger.ERROR, "can't serialize path element for signature") - return nil - } - return buf -} - -// SetSignature stores the generated signature. -func (pe *PathElement) SetSignature(sig *util.PeerSignature) error { - pe.Signature = sig - return nil -} - -// Wire returns the path element suitable for inclusion into messages -func (pe *PathElement) Wire() *PathElementWire { - return &PathElementWire{ - Predecessor: pe.PeerPredecessor, - Signature: pe.Signature, - } -} - //---------------------------------------------------------------------- // DHT-P2P-GET messages are used to request information from other // peers in the DHT. @@ -171,7 +87,7 @@ func (m *DHTP2PGetMsg) Header() *Header { return &Header{m.MsgSize, m.MsgType} } -// Clone message +// Update message (forwarding) func (m *DHTP2PGetMsg) Update(pf *blocks.PeerFilter, rf blocks.ResultFilter, hop uint16) *DHTP2PGetMsg { buf := rf.Bytes() ns := uint16(len(buf)) @@ -197,39 +113,39 @@ func (m *DHTP2PGetMsg) Update(pf *blocks.PeerFilter, rf blocks.ResultFilter, hop // DHTP2PPutMsg wire layout type DHTP2PPutMsg struct { - MsgSize uint16 `order:"big"` // total size of message - MsgType uint16 `order:"big"` // DHT_P2P_PUT (146) - BType uint32 `order:"big"` // block type - Flags uint16 `order:"big"` // processing flags - HopCount uint16 `order:"big"` // message hops - ReplLvl uint16 `order:"big"` // replication level - PathL uint16 `order:"big"` // path length - Expiration util.AbsoluteTime `` // expiration date - PeerFilter *blocks.PeerFilter `` // peer bloomfilter - Key *crypto.HashCode `` // query key to block - Origin []byte `size:"(PESize)"` // truncated origin (if TRUNCATED flag set) - PutPath []*PathElementWire `size:"PathL"` // PUT path - LastSig []byte `size:"(PESize)"` // signature of last hop (if RECORD_ROUTE flag is set) - Block []byte `size:"*"` // block data + MsgSize uint16 `order:"big"` // total size of message + MsgType uint16 `order:"big"` // DHT_P2P_PUT (146) + BType uint32 `order:"big"` // block type + Flags uint16 `order:"big"` // processing flags + HopCount uint16 `order:"big"` // message hops + ReplLvl uint16 `order:"big"` // replication level + PathL uint16 `order:"big"` // path length + Expiration util.AbsoluteTime `` // expiration date + PeerFilter *blocks.PeerFilter `` // peer bloomfilter + Key *crypto.HashCode `` // query key to block + TruncOrigin []byte `size:"(PESize)"` // truncated origin (if TRUNCATED flag set) + PutPath []*path.Entry `size:"PathL"` // PUT path + LastSig []byte `size:"(PESize)"` // signature of last hop (if RECORD_ROUTE flag is set) + Block []byte `size:"*"` // block data } // NewDHTP2PPutMsg creates an empty new DHTP2PPutMsg func NewDHTP2PPutMsg() *DHTP2PPutMsg { return &DHTP2PPutMsg{ - MsgSize: 218, // total size without path and block data - MsgType: DHT_P2P_PUT, // DHT_P2P_PUT (146) - BType: 0, // block type - Flags: 0, // processing flags - HopCount: 0, // message hops - ReplLvl: 0, // replication level - PathL: 0, // no PUT path - Expiration: util.AbsoluteTimeNever(), // expiration date - PeerFilter: blocks.NewPeerFilter(), // peer bloom filter - Key: crypto.NewHashCode(nil), // query key - Origin: nil, // no truncated path - PutPath: make([]*PathElementWire, 0), // empty PUT path - LastSig: nil, // no signature from last hop - Block: nil, // no block data + MsgSize: 218, // total size without path and block data + MsgType: DHT_P2P_PUT, // DHT_P2P_PUT (146) + BType: 0, // block type + Flags: 0, // processing flags + HopCount: 0, // message hops + ReplLvl: 0, // replication level + PathL: 0, // no PUT path + Expiration: util.AbsoluteTimeNever(), // expiration date + PeerFilter: blocks.NewPeerFilter(), // peer bloom filter + Key: crypto.NewHashCode(nil), // query key + TruncOrigin: nil, // no truncated path + PutPath: make([]*path.Entry, 0), // empty PUT path + LastSig: nil, // no signature from last hop + Block: nil, // no block data } } @@ -238,24 +154,117 @@ func (m *DHTP2PPutMsg) PESize(field string) uint { switch field { case "Origin": if m.Flags&enums.DHT_RO_TRUNCATED != 0 { - return 32 + return util.NewPeerID(nil).Size() } case "LastSig": if m.Flags&enums.DHT_RO_RECORD_ROUTE != 0 { - return 64 + return util.NewPeerSignature(nil).Size() } } return 0 } -// AddPutPath adds an element to the PUT path -func (m *DHTP2PPutMsg) AppendPutPath(pe *PathElement) { - pew := pe.Wire() - m.PutPath = append(m.PutPath, pew) - m.PathL++ - m.MsgSize += pew.Size() +//---------------------------------------------------------------------- + +// Update message (forwarding) +func (m *DHTP2PPutMsg) Update(p *path.Path, pf *blocks.PeerFilter, hop uint16) *DHTP2PPutMsg { + msg := NewDHTP2PPutMsg() + msg.Flags = m.Flags + msg.HopCount = hop + msg.PathL = m.PathL + msg.Expiration = m.Expiration + msg.PeerFilter = pf + msg.Key = m.Key.Clone() + msg.TruncOrigin = m.TruncOrigin + msg.PutPath = util.Clone(m.PutPath) + msg.LastSig = m.LastSig + msg.Block = util.Clone(m.Block) + msg.SetPath(p) + return msg +} + +//---------------------------------------------------------------------- +// Path handling (get/set path in message) +//---------------------------------------------------------------------- + +// Path returns the current path from message +func (m *DHTP2PPutMsg) Path(sender *util.PeerID) *path.Path { + // create a "real" path list from message data + pth := path.NewPath(crypto.Hash(m.Block), m.Expiration) + + // return empty path if recording is switched off + if m.Flags&enums.DHT_RO_RECORD_ROUTE == 0 { + return pth + } + + // handle truncate origin + if m.Flags&enums.DHT_RO_TRUNCATED == 1 { + if m.TruncOrigin == nil || len(m.TruncOrigin) == 0 { + logger.Printf(logger.WARN, "[path] truncated but no origin - flag reset") + m.Flags &^= enums.DHT_RO_TRUNCATED + } else { + pth.TruncOrigin = util.NewPeerID(m.TruncOrigin) + pth.Flags |= path.PathTruncated + } + } + + // copy path elements + pth.List = util.Clone(m.PutPath) + pth.NumList = uint16(len(pth.List)) + + // handle last hop signature + if m.LastSig == nil || len(m.LastSig) == 0 { + logger.Printf(logger.WARN, "[path] - last hop signature missing - path reset") + return path.NewPath(crypto.Hash(m.Block), m.Expiration) + } + pth.Flags |= path.PathLastHop + pth.LastSig = util.NewPeerSignature(m.LastSig) + pth.LastHop = sender + return pth +} + +// Set path in message; corrects the message size accordingly +func (m *DHTP2PPutMsg) SetPath(p *path.Path) { + + // return if recording is switched off (don't touch path) + if m.Flags&enums.DHT_RO_RECORD_ROUTE == 0 { + return + } + // compute old path size + var pes uint + if len(m.PutPath) > 0 { + pes = m.PutPath[0].Size() + } + oldSize := uint(len(m.PutPath))*pes + m.PESize("Origin") + m.PESize("LastSig") + // if no new path is defined,... + if p == nil { + // ... remove existing path + m.TruncOrigin = nil + m.PutPath = nil + m.LastSig = nil + m.PathL = 0 + m.Flags &^= enums.DHT_RO_TRUNCATED + m.MsgSize -= uint16(oldSize) + return + } + // adjust message size + m.MsgSize += uint16(p.Size() - oldSize) + + // transfer path data + if p.TruncOrigin != nil { + // truncated path + m.Flags |= enums.DHT_RO_TRUNCATED + m.TruncOrigin = p.TruncOrigin.Bytes() + } + m.PutPath = util.Clone(p.List) + m.PathL = uint16(len(m.PutPath)) + if p.LastSig != nil { + m.LastSig = p.LastSig.Bytes() + } } +//---------------------------------------------------------------------- + // String returns a human-readable representation of the message. func (m *DHTP2PPutMsg) String() string { return fmt.Sprintf("DHTP2PPutMsg{btype=%s,hops=%d,flags=%d}", @@ -274,19 +283,19 @@ func (m *DHTP2PPutMsg) Header() *Header { // DHTP2PResultMsg wire layout type DHTP2PResultMsg struct { - MsgSize uint16 `order:"big"` // total size of message - MsgType uint16 `order:"big"` // DHT_P2P_RESULT (148) - BType uint32 `order:"big"` // Block type of result - Reserved uint32 `order:"big"` // Reserved for further use - PutPathL uint16 `order:"big"` // size of PUTPATH field - GetPathL uint16 `order:"big"` // size of GETPATH field - Expires util.AbsoluteTime `` // expiration date - Query *crypto.HashCode `` // Query key for block - Origin []byte `size:"(PESize)"` // truncated origin (if TRUNCATED flag set) - PutPath []*PathElementWire `size:"PutPathL"` // PUTPATH - GetPath []*PathElementWire `size:"GetPathL"` // GETPATH - LastSig []byte `size:"(PESize)"` // signature of last hop (if RECORD_ROUTE flag is set) - Block []byte `size:"*"` // block data + MsgSize uint16 `order:"big"` // total size of message + MsgType uint16 `order:"big"` // DHT_P2P_RESULT (148) + BType uint32 `order:"big"` // Block type of result + Reserved uint32 `order:"big"` // Reserved for further use + PutPathL uint16 `order:"big"` // size of PUTPATH field + GetPathL uint16 `order:"big"` // size of GETPATH field + Expires util.AbsoluteTime `` // expiration date + Query *crypto.HashCode `` // Query key for block + Origin []byte `size:"(PESize)"` // truncated origin (if TRUNCATED flag set) + PutPath []*path.Entry `size:"PutPathL"` // PUTPATH + GetPath []*path.Entry `size:"GetPathL"` // GETPATH + LastSig []byte `size:"(PESize)"` // signature of last hop (if RECORD_ROUTE flag is set) + Block []byte `size:"*"` // block data } // NewDHTP2PResultMsg creates a new empty DHTP2PResultMsg diff --git a/src/gnunet/service/dht/blocks/filters.go b/src/gnunet/service/dht/blocks/filters.go @@ -43,6 +43,13 @@ func NewPeerFilter() *PeerFilter { } } +// NewPeerFilterFromBytes creates a peer filter from data. +func NewPeerFilterFromBytes(data []byte) *PeerFilter { + return &PeerFilter{ + BF: NewBloomFilterFromBytes(data), + } +} + // Add peer id to the filter func (pf *PeerFilter) Add(p *util.PeerID) { pf.BF.Add(p.Data) @@ -112,42 +119,78 @@ type ResultFilter interface { } //---------------------------------------------------------------------- -// Dummy result filter -// [Additional filters (per block type) are defined in corresponding files] +// Generic result filter: +// Filter duplicate blocks (identical hash value over content) //---------------------------------------------------------------------- -// PassResultFilter is a dummy result filter with no state. -type PassResultFilter struct{} +// GenericResultFilter is a dummy result filter with no state. +type GenericResultFilter struct { + bf *BloomFilter +} + +// NewGenericResultFilter creates a new empty result bloom filter +func NewGenericResultFilter() *GenericResultFilter { + return &GenericResultFilter{ + bf: NewBloomFilter(128), + } +} // Add a block to the result filter. -func (rf *PassResultFilter) Add(Block) { +func (rf *GenericResultFilter) Add(b Block) { + rf.bf.Add(b.Bytes()) } // Contains returns true if entry (binary representation) is filtered -func (rf *PassResultFilter) Contains(Block) bool { - return false +func (rf *GenericResultFilter) Contains(b Block) bool { + return rf.bf.Contains(b.Bytes()) } // Bytes returns the binary representation of a result filter -func (rf *PassResultFilter) Bytes() (buf []byte) { - return +func (rf *GenericResultFilter) Bytes() (buf []byte) { + return rf.bf.Bytes() } // Merge two result filters -func (rf *PassResultFilter) Merge(ResultFilter) bool { +func (rf *GenericResultFilter) Merge(t ResultFilter) bool { + // check for correct type + trf, ok := t.(*GenericResultFilter) + if !ok { + return false + } + // check for identical mutator (if any) + if !bytes.Equal(rf.bf.mInput, trf.bf.mInput) { + return false + } + // check for same size + if len(rf.bf.Bits) != len(trf.bf.Bits) { + return false + } + // merge bloomfilters + for i := range rf.bf.Bits { + rf.bf.Bits[i] ^= trf.bf.Bits[i] + } return true } // Compare two result filters -func (rf *PassResultFilter) Compare(t ResultFilter) int { - if _, ok := t.(*PassResultFilter); ok { +func (rf *GenericResultFilter) Compare(t ResultFilter) int { + trf, ok := t.(*GenericResultFilter) + if !ok { + return CMP_DIFFER + } + // check for identical mutator (if any) + if !bytes.Equal(rf.bf.mInput, trf.bf.mInput) { + return CMP_DIFFER + } + // check for identical bits + if bytes.Equal(rf.bf.Bits, trf.bf.Bits) { return CMP_SAME } - return CMP_DIFFER + return CMP_MERGE } //====================================================================== -// Generic bllom filter with mutator +// Generic bloom filter with mutator //====================================================================== // BloomFilter is a space-efficient probabilistic datastructure to test if @@ -171,6 +214,15 @@ func NewBloomFilter(n int) *BloomFilter { } } +// NewBloomFilterFromBytes creates a new filter from data +func NewBloomFilterFromBytes(data []byte) *BloomFilter { + return &BloomFilter{ + Bits: util.Clone(data), + mInput: nil, + mData: nil, + } +} + // SetMutator to define a mutator for randomization. If 'm' is nil, // the mutator is removed from the filter (use with care!) func (bf *BloomFilter) SetMutator(m any) { diff --git a/src/gnunet/service/dht/blocks/filters_test.go b/src/gnunet/service/dht/blocks/filters_test.go @@ -21,6 +21,8 @@ package blocks import ( "bytes" "crypto/rand" + "encoding/base64" + "gnunet/util" "sort" "testing" ) @@ -108,3 +110,25 @@ func TestBloomfilter(t *testing.T) { t.Logf("FAILED with %d false-positives", count) } } + +func TestBFCase1(t *testing.T) { + senderS := "83JF73PZ69ZFVCHH9VDEGY673EH4H3B4Y4XRV8XB3PQHP8SFN220" + pfS := "AAAAABAACAAQAAAAACAAgAAAAIAAAACAAAAAAAAABAAQAAAADAAAAABA" + + "AAAAAAAAAAAQAAAAAAAAAAAACAAIAAAAAACAAABAAAAAAIgEAAAABAAACAAAAA" + + "EAAAAAAAAAAEABAAAAAAAAFIAAEAAAAAAAAAAAAABAAIAAAAAAAAA=" + + // decode sender + buf, err := util.DecodeStringToBinary(senderS, 32) + if err != nil { + t.Fatal(err) + } + sender := util.NewPeerID(buf) + + // decode peer filter + if buf, err = base64.StdEncoding.DecodeString(pfS); err != nil { + t.Fatal(err) + } + pf := NewPeerFilterFromBytes(buf) + rc := pf.Contains(sender) + t.Logf("contains? %v", rc) +} diff --git a/src/gnunet/service/dht/blocks/generic.go b/src/gnunet/service/dht/blocks/generic.go @@ -39,11 +39,14 @@ type Query interface { Key() *crypto.HashCode // Type returns the requested block type - Type() uint16 + Type() enums.BlockType // Flags returns the query flags Flags() uint16 + // Params holds additional info for queries + Params() util.ParameterSet + // Verify the integrity of a retrieved block (optional). Override in // custom query types to implement block-specific integrity checks // (see GNSQuery for example). @@ -60,12 +63,12 @@ type Query interface { // DHT Block interface type Block interface { - // Data returns the DHT block data (unstructured without type and + // Bytes returns the DHT block data (unstructured without type and // expiration information. - Data() []byte + Bytes() []byte // Return the block type - Type() uint16 + Type() enums.BlockType // Expire returns the block expiration Expire() util.AbsoluteTime @@ -82,7 +85,7 @@ type Block interface { // Unwrap (raw) block to a specific block type func Unwrap(blk Block, obj interface{}) error { - return data.Unmarshal(obj, blk.Data()) + return data.Unmarshal(obj, blk.Bytes()) } //---------------------------------------------------------------------- @@ -95,13 +98,13 @@ type GenericQuery struct { key *crypto.HashCode // block type requested - btype uint16 + btype enums.BlockType // query flags flags uint16 // Params holds additional query parameters - Params util.ParameterSet + params util.ParameterSet } // Key interface method implementation @@ -110,7 +113,7 @@ func (q *GenericQuery) Key() *crypto.HashCode { } // Type returns the requested block type -func (q *GenericQuery) Type() uint16 { +func (q *GenericQuery) Type() enums.BlockType { return q.btype } @@ -119,6 +122,11 @@ func (q *GenericQuery) Flags() uint16 { return q.flags } +// Params holds additional info for queries +func (q *GenericQuery) Params() util.ParameterSet { + return q.params +} + // Verify interface method implementation func (q *GenericQuery) Verify(b Block) error { // no verification, no errors ;) @@ -137,12 +145,12 @@ func (q *GenericQuery) String() string { } // NewGenericQuery creates a simple Query from hash code. -func NewGenericQuery(key []byte, btype enums.BlockType, flags uint16) *GenericQuery { +func NewGenericQuery(key *crypto.HashCode, btype enums.BlockType, flags uint16) *GenericQuery { return &GenericQuery{ - key: crypto.NewHashCode(key), - btype: uint16(btype), + key: key, + btype: btype, flags: flags, - Params: make(util.ParameterSet), + params: make(util.ParameterSet), } } @@ -151,17 +159,17 @@ func NewGenericQuery(key []byte, btype enums.BlockType, flags uint16) *GenericQu // GenericBlock is the block in simple binary representation type GenericBlock struct { block []byte // block data - btype uint16 // block type + btype enums.BlockType // block type expire util.AbsoluteTime // expiration date } -// Data interface method implementation -func (b *GenericBlock) Data() []byte { +// Bytes returns the binary representation +func (b *GenericBlock) Bytes() []byte { return b.block } // Type returns the block type -func (b *GenericBlock) Type() uint16 { +func (b *GenericBlock) Type() enums.BlockType { return b.btype } @@ -186,7 +194,7 @@ func (b *GenericBlock) Verify() (bool, error) { func NewGenericBlock(buf []byte) *GenericBlock { return &GenericBlock{ block: util.Clone(buf), - btype: uint16(enums.BLOCK_TYPE_ANY), // unknown block type - expire: util.AbsoluteTimeNever(), // never expires + btype: enums.BLOCK_TYPE_ANY, // unknown block type + expire: util.AbsoluteTimeNever(), // never expires } } diff --git a/src/gnunet/service/dht/blocks/gns.go b/src/gnunet/service/dht/blocks/gns.go @@ -104,7 +104,7 @@ func NewGNSQuery(zkey *crypto.ZoneKey, label string) *GNSQuery { if err != nil { logger.Printf(logger.ERROR, "[NewGNSQuery] failed: %s", err.Error()) } - gq := crypto.Hash(pd.Bytes()).Bits + gq := crypto.Hash(pd.Bytes()) return &GNSQuery{ GenericQuery: *NewGenericQuery(gq, enums.BLOCK_TYPE_GNS_NAMERECORD, 0), Zone: zkey, @@ -141,8 +141,8 @@ type GNSBlock struct { data []byte // decrypted data } -// Data block interface implementation -func (b *GNSBlock) Data() []byte { +// Bytes return th binary representation of block +func (b *GNSBlock) Bytes() []byte { buf, _ := data.Marshal(b) return buf } diff --git a/src/gnunet/service/dht/blocks/hello.go b/src/gnunet/service/dht/blocks/hello.go @@ -198,12 +198,12 @@ func (h *HelloBlock) finalize() (err error) { } // Return the block type -func (h *HelloBlock) Type() uint16 { - return uint16(enums.BLOCK_TYPE_DHT_URL_HELLO) +func (h *HelloBlock) Type() enums.BlockType { + return enums.BLOCK_TYPE_DHT_URL_HELLO } -// Data returns the raw block data -func (h *HelloBlock) Data() []byte { +// Bytes returns the raw block data +func (h *HelloBlock) Bytes() []byte { buf, err := data.Marshal(h) if err != nil { logger.Println(logger.ERROR, "[hello] Failed to serialize HELLO block: "+err.Error()) @@ -376,6 +376,8 @@ func (bh *HelloBlockHandler) FilterResult(b Block, key *crypto.HashCode, rf Resu } //---------------------------------------------------------------------- +// HELLO result filter +//---------------------------------------------------------------------- // HelloResultFilter is a result filter implementation for HELLO blocks type HelloResultFilter struct { diff --git a/src/gnunet/service/dht/local.go b/src/gnunet/service/dht/local.go @@ -0,0 +1,86 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package dht + +import ( + "errors" + "gnunet/enums" + "gnunet/service/dht/blocks" + "gnunet/service/store" + + "github.com/bfix/gospel/logger" + "github.com/bfix/gospel/math" +) + +// getHelloCache tries to find the requested HELLO block in the HELLO cache +func (m *Module) getHelloCache(label string, addr *PeerAddress, rf blocks.ResultFilter) (entry *store.DHTEntry, dist *math.Int) { + logger.Printf(logger.DBG, "[%s] GET message for HELLO: check cache", label) + // find best cached HELLO + var block blocks.Block + block, dist = m.rtable.BestHello(addr, rf) + + // if block is filtered, skip it + if block != nil { + if !rf.Contains(block) { + entry = &store.DHTEntry{Blk: block} + } else { + logger.Printf(logger.DBG, "[%s] GET message for HELLO: matching DHT block is filtered", label) + entry = nil + dist = nil + } + } + return +} + +// getLocalStorage tries to find the requested block in local storage +func (m *Module) getLocalStorage(label string, query blocks.Query, rf blocks.ResultFilter) (entry *store.DHTEntry, dist *math.Int, err error) { + + // query DHT store for exact match (9.4.3.3c) + if entry, err = m.store.Get(query); err != nil { + logger.Printf(logger.ERROR, "[%s] Failed to get DHT block from storage: %s", label, err.Error()) + return + } + if entry != nil { + dist = math.ZERO + // check if we are filtered out + if rf.Contains(entry.Blk) { + logger.Printf(logger.DBG, "[%s] matching DHT block is filtered", label) + entry = nil + dist = nil + } + } + // if we have no exact match, find approximate block if requested + if entry == nil || query.Flags()&enums.DHT_RO_FIND_APPROXIMATE != 0 { + // no exact match: find approximate (9.4.3.3b) + match := func(e *store.DHTEntry) bool { + return rf.Contains(e.Blk) + } + var d any + entry, d, err = m.store.GetApprox(query, match) + var ok bool + dist, ok = d.(*math.Int) + if !ok { + err = errors.New("no approx distance") + } + if err != nil { + logger.Printf(logger.ERROR, "[%s] Failed to get (approx.) DHT block from storage: %s", label, err.Error()) + } + } + return +} diff --git a/src/gnunet/service/dht/messages.go b/src/gnunet/service/dht/messages.go @@ -23,6 +23,8 @@ import ( "gnunet/enums" "gnunet/message" "gnunet/service/dht/blocks" + "gnunet/service/dht/path" + "gnunet/service/store" "gnunet/transport" "gnunet/util" @@ -40,11 +42,18 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m // assemble log label label := "dht" if v := ctx.Value("label"); v != nil { - if s, _ := v.(string); len(s) > 0 { + if s, ok := v.(string); ok && len(s) > 0 { label = "dht-" + s } } logger.Printf(logger.INFO, "[%s] message received from %s", label, sender) + local := m.core.PeerID() + + // check for local message + if sender.Equals(local) { + logger.Printf(logger.WARN, "[%s] dropping local message received: %s", label, util.Dump(msgIn, "json")) + return false + } // process message switch msg := msgIn.(type) { @@ -54,11 +63,10 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m // DHT-P2P GET //-------------------------------------------------------------- logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-GET message", label) - query := blocks.NewGenericQuery(msg.Query.Bits, enums.BlockType(msg.BType), msg.Flags) + query := blocks.NewGenericQuery(msg.Query, enums.BlockType(msg.BType), msg.Flags) - var block blocks.Block + var entry *store.DHTEntry var dist *math.Int - var err error //-------------------------------------------------------------- // validate query (based on block type requested) (9.4.3.1) @@ -79,83 +87,79 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m if !msg.PeerFilter.Contains(sender) { logger.Printf(logger.WARN, "[%s] sender not in peer filter", label) } - // parse result filter - var rf blocks.ResultFilter = new(blocks.PassResultFilter) + // parse result filter ... + var rf blocks.ResultFilter if msg.ResFilter != nil && len(msg.ResFilter) > 0 { if blockHdlr != nil { rf = blockHdlr.ParseResultFilter(msg.ResFilter) } else { logger.Printf(logger.WARN, "[%s] unknown result filter implementation -- skipped", label) } + } else { + // ... or create a new one + if blockHdlr != nil { + rf = blockHdlr.SetupResultFilter(128, util.RndUInt32()) + } else { + logger.Printf(logger.WARN, "[%s] using default result filter", label) + rf = blocks.NewGenericResultFilter() + } } // clone peer filter pf := msg.PeerFilter.Clone() //---------------------------------------------------------- // check if we need to respond (and how) (9.4.3.3) - addr := NewQueryAddress(msg.Query) - closest := m.rtable.IsClosestPeer(nil, addr, msg.PeerFilter) + addr := NewQueryAddress(query.Key()) + closest := m.rtable.IsClosestPeer(nil, addr, msg.PeerFilter, 0) demux := int(msg.Flags)&enums.DHT_RO_DEMULTIPLEX_EVERYWHERE != 0 approx := int(msg.Flags)&enums.DHT_RO_FIND_APPROXIMATE != 0 // actions doResult := closest || (demux && approx) doForward := !closest || (demux && !approx) - logger.Printf(logger.DBG, "[dht] GET message: closest=%v, demux=%v, approx=%v --> result=%v, forward=%v", - closest, demux, approx, doResult, doForward) + logger.Printf(logger.DBG, "[%s] GET message: closest=%v, demux=%v, approx=%v --> result=%v, forward=%v", + label, closest, demux, approx, doResult, doForward) //------------------------------------------------------ // query for a HELLO? (9.4.3.3a) - if msg.BType == uint32(enums.BLOCK_TYPE_DHT_URL_HELLO) { - logger.Println(logger.DBG, "[dht] GET message for HELLO: check cache") - // find best cached HELLO - block, dist = m.rtable.BestHello(addr, rf) + if btype == enums.BLOCK_TYPE_DHT_URL_HELLO { + // try to find result in HELLO cache + entry, dist = m.getHelloCache(label, addr, rf) } //-------------------------------------------------------------- - // find the closest block that has that is not filtered/ by the result + // find the closest block that has that is not filtered by the result // filter (in case we did not find an appropriate block in cache). if doResult { // save best-match values from cache - blockCache := block + entryCache := entry distCache := dist - - // query DHT store for exact match (9.4.3.3c) - if block, err = m.Get(ctx, query); err != nil { - logger.Printf(logger.ERROR, "[%s] Failed to get DHT block from storage: %s", label, err.Error()) - return true - } - // if block is filtered, skip it - if rf.Contains(block) { - logger.Println(logger.DBG, "[dht] GET message for HELLO: matching DHT block is filtered") - block = nil - } - // if we have no exact match, find approximate block if requested - if block == nil || approx { - // no exact match: find approximate (9.4.3.3b) - match := func(b blocks.Block) bool { - return rf.Contains(b) + dist = nil + + // if we don't have an exact match, try storage lookup + if entryCache == nil || (distCache != nil && !distCache.Equals(math.ZERO)) { + // get entry from local storage + var err error + if entry, dist, err = m.getLocalStorage(label, query, rf); err != nil { + entry = nil + dist = nil } - block, dist, err = m.GetApprox(ctx, query, match) - if err != nil { - logger.Printf(logger.ERROR, "[%s] Failed to get (approx.) DHT block from storage: %s", label, err.Error()) - return true + // if we have a block from cache, check if it is better than the + // block found in the DHT + if entryCache != nil && dist != nil && distCache.Cmp(dist) < 0 { + entry = entryCache + dist = distCache } } - // if we have a block from cache, check if it is better than the - // block found in the DHT - if blockCache != nil && distCache.Cmp(dist) < 0 { - block = blockCache - } // if we have a block, send it as response - if block != nil { + if entry != nil { logger.Printf(logger.INFO, "[%s] sending DHT result message to caller", label) - if err := m.sendResult(ctx, query, block, back); err != nil { + if err := m.sendResult(ctx, query, entry.Blk, back); err != nil { logger.Printf(logger.ERROR, "[%s] Failed to send DHT result message: %s", label, err.Error()) } } } // check if we need to forward message based on filter result - if block != nil && blockHdlr != nil { - switch blockHdlr.FilterResult(block, query.Key(), rf, msg.XQuery) { + if entry != nil && blockHdlr != nil { + switch blockHdlr.FilterResult(entry.Blk, query.Key(), rf, msg.XQuery) { case blocks.RF_LAST: // no need for further results case blocks.RF_MORE: @@ -167,14 +171,13 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m } if doForward { // build updated GET message - pf.Add(m.core.PeerID()) + pf.Add(local) msgOut := msg.Update(pf, rf, msg.HopCount+1) // forward to number of peers numForward := m.rtable.ComputeOutDegree(msg.ReplLevel, msg.HopCount) - key := NewQueryAddress(query.Key()) for n := 0; n < numForward; n++ { - if p := m.rtable.SelectClosestPeer(key, pf); p != nil { + if p := m.rtable.SelectClosestPeer(addr, pf, 0); p != nil { // forward message to peer logger.Printf(logger.INFO, "[%s] forward DHT get message to %s", label, p.String()) if err := back.Send(ctx, msgOut); err != nil { @@ -183,7 +186,7 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m pf.Add(p.Peer) // create open get-forward result handler rh := NewForwardResultHandler(msg, rf, back) - logger.Printf(logger.INFO, "[%s] DHT-P2P-GET task #%d started", label, rh.ID()) + logger.Printf(logger.INFO, "[%s] DHT-P2P-GET task #%d (%s) started", label, rh.ID(), rh.Key()) m.reshdlrs.Add(rh) } else { break @@ -198,6 +201,13 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m //---------------------------------------------------------- logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-PUT message", label) + // assemble query and entry + query := blocks.NewGenericQuery(msg.Key, enums.BlockType(msg.BType), msg.Flags) + entry := &store.DHTEntry{ + Blk: blocks.NewGenericBlock(msg.Block), + Path: nil, + } + //-------------------------------------------------------------- // check if request is expired (9.3.2.1) if msg.Expiration.Expired() { @@ -226,27 +236,93 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m logger.Printf(logger.INFO, "[%s] No validator defined for block type %s", label, btype.String()) blockHdlr = nil } + // clone peer filter + pf := msg.PeerFilter.Clone() + + //---------------------------------------------------------- + // check if we need to respond (and how) + addr := NewQueryAddress(msg.Key) + closest := m.rtable.IsClosestPeer(nil, addr, msg.PeerFilter, 0) + demux := int(msg.Flags)&enums.DHT_RO_DEMULTIPLEX_EVERYWHERE != 0 + logger.Printf(logger.DBG, "[%s] PUT message: closest=%v, demux=%v", label, closest, demux) + //-------------------------------------------------------------- // check if sender is in peer filter (9.3.2.5) if !msg.PeerFilter.Contains(sender) { logger.Printf(logger.WARN, "[%s] Sender not in peer filter", label) } //-------------------------------------------------------------- - // check if route is recorded (9.3.2.6) - /* - withPath := msg.Flags&enums.DHT_RO_RECORD_ROUTE != 0 - if withPath { - spe := message.NewPathElement(msg.Key, sender, p) - m.core.Sign(spe) - msg.AppendPutPath(spe) - } - */ - //-------------------------------------------------------------- // verify PUT path (9.3.2.7) - if msg.PathL > 0 { + // 'entry.Path' will be used as path in stored and forwarded messages. + // The resulting path is always valid; it is truncated/reset on + // signature failure. + entry.Path = msg.Path(sender) + entry.Path.Verify(local) + //-------------------------------------------------------------- + // store locally if we are closest peer or demux is set (9.3.2.8) + if closest || demux { + // store in local storage + if err := m.store.Put(query, entry); err != nil { + logger.Printf(logger.ERROR, "[%s] failed to store DHT entry: %s", label, err.Error()) + } } + //-------------------------------------------------------------- + // if the put is for a HELLO block, add the sender to the + // routing table (9.3.2.9) + if btype == enums.BLOCK_TYPE_DHT_HELLO { + // get addresses from HELLO block + hello, err := blocks.ParseHelloFromBytes(msg.Block) + if err != nil { + logger.Printf(logger.ERROR, "[%s] failed to parse HELLO block: %s", label, err.Error()) + } else { + // check state of bucket for given address + if m.rtable.Check(NewPeerAddress(sender)) == 0 { + // we could add the sender to the routing table + for _, addr := range hello.Addresses() { + if transport.CanHandleAddress(addr) { + // try to connect to peer (triggers EV_CONNECTED on success) + m.core.TryConnect(sender, addr) + } + } + } + } + } + + //-------------------------------------------------------------- + // check if we need to forward + if !closest || demux { + // add local node to filter + pf.Add(local) + + // forward to computed number of peers + numForward := m.rtable.ComputeOutDegree(msg.ReplLvl, msg.HopCount) + for n := 0; n < numForward; n++ { + if p := m.rtable.SelectClosestPeer(addr, pf, 0); p != nil { + // check if route is recorded (9.3.2.6) + var pp *path.Path + if msg.Flags&enums.DHT_RO_RECORD_ROUTE != 0 { + // yes: add path element + pp = entry.Path.Clone() + pe := pp.NewElement(sender, local, p.Peer) + pp.Add(pe) + } + // build updated PUT message + msgOut := msg.Update(pp, pf, msg.HopCount+1) + + // forward message to peer + logger.Printf(logger.INFO, "[%s] forward DHT put message to %s", label, p.String()) + if err := back.Send(ctx, msgOut); err != nil { + logger.Printf(logger.ERROR, "[%s] Failed to forward DHT put message: %s", label, err.Error()) + } + // add forward node to filter + pf.Add(p.Peer) + } else { + break + } + } + } logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-PUT message done", label) case *message.DHTP2PResultMsg: @@ -257,18 +333,21 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m // check task list for handler key := msg.Query.String() + logger.Printf(logger.DBG, "[%s] DHT-P2P-RESULT key = %s", label, key) handled := false if list, ok := m.reshdlrs.Get(key); ok { for _, rh := range list { logger.Printf(logger.DBG, "[%s] Task #%d for DHT-P2P-RESULT found", label, rh.ID()) // handle the message go rh.Handle(ctx, msg) + handled = true } return true } if !handled { logger.Printf(logger.WARN, "[%s] DHT-P2P-RESULT not processed (no handler)", label) } + logger.Printf(logger.INFO, "[%s] Handling DHT-P2P-RESULT message done", label) return handled case *message.DHTP2PHelloMsg: @@ -279,16 +358,16 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m // verify integrity of message if ok, err := msg.Verify(sender); !ok || err != nil { - logger.Println(logger.WARN, "[dht] Received invalid DHT_P2P_HELLO message") + logger.Printf(logger.WARN, "[%s] Received invalid DHT_P2P_HELLO message", label) if err != nil { - logger.Println(logger.ERROR, "[dht] --> "+err.Error()) + logger.Printf(logger.ERROR, "[%s] --> %s", label, err.Error()) } return false } - // keep peer addresses in core for transport + // keep peer addresses in core for transports aList, err := msg.Addresses() if err != nil { - logger.Println(logger.ERROR, "[dht] Failed to parse addresses from DHT_P2P_HELLO message") + logger.Printf(logger.ERROR, "[%s] Failed to parse addresses from DHT_P2P_HELLO message", label) return false } if newPeer := m.core.Learn(ctx, sender, aList); newPeer { @@ -297,7 +376,7 @@ func (m *Module) HandleMessage(ctx context.Context, sender *util.PeerID, msgIn m if msgOut, err = m.getHello(); err != nil { return false } - logger.Printf(logger.INFO, "[dht] Sending HELLO to %s: %s", sender, msgOut) + logger.Printf(logger.INFO, "[%s] Sending HELLO to %s: %s", label, sender, msgOut) err = m.core.Send(ctx, sender, msgOut) // no error if the message might have been sent if err == transport.ErrEndpMaybeSent { @@ -373,7 +452,7 @@ func (m *Module) sendResult(ctx context.Context, query blocks.Query, blk blocks. out.BType = uint32(query.Type()) out.Expires = blk.Expire() out.Query = query.Key() - out.Block = blk.Data() + out.Block = blk.Bytes() out.MsgSize += uint16(len(out.Block)) // send message return back.Send(ctx, out) diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go @@ -23,11 +23,11 @@ import ( "errors" "gnunet/config" "gnunet/core" + "gnunet/crypto" "gnunet/message" "gnunet/service" "gnunet/service/dht/blocks" "gnunet/service/store" - "gnunet/transport" "gnunet/util" gmath "math" "time" @@ -41,6 +41,26 @@ import ( //====================================================================== //---------------------------------------------------------------------- +// Responder for local message handling +//---------------------------------------------------------------------- + +type LocalResponder struct { + ch chan blocks.Block // out-going channel for incoming blocks +} + +func (lr *LocalResponder) Send(ctx context.Context, msg message.Message) error { + return nil +} + +func (lr *LocalResponder) Receiver() string { + return "@" +} + +func (lr *LocalResponder) Close() { + close(lr.ch) +} + +//---------------------------------------------------------------------- // Put and get blocks into/from a DHT. //---------------------------------------------------------------------- @@ -83,26 +103,112 @@ func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Mod } //---------------------------------------------------------------------- +// DHT methods for local use +//---------------------------------------------------------------------- -// Get a block from the DHT ["dht:get"] -func (m *Module) Get(ctx context.Context, query blocks.Query) (block blocks.Block, err error) { - return m.store.Get(query) +// Get blocks from the DHT ["dht:get"] +// Locally request blocks for a given query. The res channel will deliver the +// returned results to the caller; the channel is closed if no further blocks +// are expected or the query times out. +func (m *Module) Get(ctx context.Context, query blocks.Query) (res chan blocks.Block) { + // get the block handler for given block type to construct an empty + // result filter. If no handler is defined, a default PassResultFilter + // is created. + var rf blocks.ResultFilter = new(blocks.GenericResultFilter) + blockHdlr, ok := blocks.BlockHandlers[query.Type()] + if ok { + // create result filter + rf = blockHdlr.SetupResultFilter(128, util.RndUInt32()) + } else { + logger.Println(logger.WARN, "[dht] unknown result filter implementation -- skipped") + } + // get additional query parameters + xquery, ok := util.GetParam[[]byte](query.Params(), "xquery") + + // assemble a new GET message + msg := message.NewDHTP2PGetMsg() + msg.BType = uint32(query.Type()) + msg.Flags = query.Flags() + msg.HopCount = 0 + msg.ReplLevel = 10 + msg.PeerFilter = blocks.NewPeerFilter() + msg.ResFilter = rf.Bytes() + msg.RfSize = uint16(len(msg.ResFilter)) + msg.XQuery = xquery + msg.MsgSize += msg.RfSize + uint16(len(xquery)) + + // compose a response channel and handler + res = make(chan blocks.Block) + hdlr := &LocalResponder{ + ch: res, + } + // time-out handling + ttl, ok := util.GetParam[time.Duration](query.Params(), "timeout") + if !ok { + // defaults to 10 minutes + ttl = 600 * time.Second + } + lctx, cancel := context.WithTimeout(ctx, ttl) + + // send message + self := m.core.PeerID() + msg.PeerFilter.Add(self) + go m.HandleMessage(lctx, self, msg, hdlr) + go func() { + <-lctx.Done() + hdlr.Close() + cancel() + }() + return res } // GetApprox returns the first block not excluded ["dht:getapprox"] -func (m *Module) GetApprox(ctx context.Context, query blocks.Query, excl func(blocks.Block) bool) (block blocks.Block, dist *math.Int, err error) { - var d any - block, d, err = m.store.GetApprox(query, excl) - dist, _ = d.(*math.Int) +func (m *Module) GetApprox(ctx context.Context, query blocks.Query, excl func(*store.DHTEntry) bool) (entry *store.DHTEntry, dist *math.Int, err error) { + var val any + if entry, val, err = m.store.GetApprox(query, excl); err != nil { + return + } + hc, ok := val.(*crypto.HashCode) + if !ok { + err = errors.New("no approx result") + } + asked := NewQueryAddress(query.Key()) + found := NewQueryAddress(hc) + dist, _ = found.Distance(asked) return } // Put a block into the DHT ["dht:put"] -func (m *Module) Put(ctx context.Context, key blocks.Query, block blocks.Block) error { - return m.store.Put(key, block) +func (m *Module) Put(ctx context.Context, query blocks.Query, block blocks.Block) error { + // get additional query parameters + expire, ok := util.GetParam[util.AbsoluteTime](query.Params(), "expire") + if !ok { + expire = util.AbsoluteTimeNever() + } + // assemble a new PUT message + msg := message.NewDHTP2PPutMsg() + msg.BType = uint32(query.Type()) + msg.Flags = query.Flags() + msg.HopCount = 0 + msg.PeerFilter = blocks.NewPeerFilter() + msg.ReplLvl = 10 + msg.Expiration = expire + msg.Block = block.Bytes() + msg.Key = query.Key().Clone() + msg.TruncOrigin = nil + msg.PutPath = nil + msg.LastSig = nil + msg.MsgSize += uint16(len(msg.Block)) + + // send message + go m.HandleMessage(ctx, nil, msg, nil) + + return nil } //---------------------------------------------------------------------- +// Event handling +//---------------------------------------------------------------------- // Filter returns the event filter for the module func (m *Module) Filter() *core.EventFilter { @@ -133,31 +239,32 @@ func (m *Module) event(ctx context.Context, ev *core.Event) { // New peer connected: case core.EV_CONNECT: // Add peer to routing table - logger.Printf(logger.INFO, "[dht] Peer %s connected", ev.Peer) - m.rtable.Add(NewPeerAddress(ev.Peer)) + logger.Printf(logger.INFO, "[dht-event] Peer %s connected", ev.Peer) + m.rtable.Add(NewPeerAddress(ev.Peer), "dht-event") // Peer disconnected: case core.EV_DISCONNECT: // Remove peer from routing table - logger.Printf(logger.INFO, "[dht] Peer %s disconnected", ev.Peer) - m.rtable.Remove(NewPeerAddress(ev.Peer)) + logger.Printf(logger.INFO, "[dht-event] Peer %s disconnected", ev.Peer) + m.rtable.Remove(NewPeerAddress(ev.Peer), 0) // Message received. case core.EV_MESSAGE: - logger.Printf(logger.INFO, "[dht] Message received: %s", ev.Msg.String()) + logger.Printf(logger.INFO, "[dht-event] Message received: %s", ev.Msg.String()) // check if peer is in routing table (connected peer) if !m.rtable.Contains(NewPeerAddress(ev.Peer)) { - logger.Printf(logger.WARN, "[dht] message %d from unregistered peer -- discarded", ev.Msg.Header().MsgType) + logger.Printf(logger.WARN, "[dht-event] message %d from unregistered peer -- discarded", ev.Msg.Header().MsgType) return } // process message if !m.HandleMessage(ctx, ev.Peer, ev.Msg, ev.Resp) { - logger.Println(logger.WARN, "[dht] Message NOT handled!") + logger.Println(logger.WARN, "[dht-event] Message NOT handled!") } } } +//---------------------------------------------------------------------- // Heartbeat handler for periodic tasks func (m *Module) heartbeat(ctx context.Context) { // run heartbeat for routing table @@ -167,6 +274,10 @@ func (m *Module) heartbeat(ctx context.Context) { m.reshdlrs.Cleanup() } +//---------------------------------------------------------------------- +// HELLO handling +//---------------------------------------------------------------------- + // Send the currently active HELLO to given network address func (m *Module) SendHello(ctx context.Context, addr *util.Address) (err error) { // get (buffered) HELLO @@ -217,7 +328,7 @@ func (m *Module) getHello() (msg *message.DHTP2PHelloMsg, err error) { logger.Println(logger.ERROR, err.Error()) return } - logger.Println(logger.DBG, "[dht] New HELLO: "+transport.Dump(msg, "hex")) + logger.Println(logger.DBG, "[dht] New HELLO: "+util.Dump(msg, "hex")) return } // we have a valid HELLO for re-use. diff --git a/src/gnunet/service/dht/path/elements.go b/src/gnunet/service/dht/path/elements.go @@ -0,0 +1,129 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package path + +import ( + "encoding/hex" + "errors" + "fmt" + "gnunet/crypto" + "gnunet/enums" + "gnunet/util" + + "github.com/bfix/gospel/data" + "github.com/bfix/gospel/logger" +) + +// Error values +var ( + ErrPathNoSig = errors.New("missing signature for path element verification") +) + +//---------------------------------------------------------------------- +// Entry is an element of the path list +type Entry struct { + Signer *util.PeerID // path element signer + Signature *util.PeerSignature // path element signature +} + +// Size returns the size of a path element in wire format +func (e *Entry) Size() uint { + return e.Signer.Size() + e.Signature.Size() +} + +// String returns a human-readable representation +func (e *Entry) String() string { + s := hex.EncodeToString(e.Signature.Bytes()) + num := len(s) + if num > 16 { + s = s[:8] + ".." + s[num-8:] + } + return fmt.Sprintf("(%s,%s)", e.Signer.String(), s) +} + +//---------------------------------------------------------------------- +// shared path element data across types +type elementData struct { + Expiration util.AbsoluteTime // expiration date + BlockHash *crypto.HashCode // block hash + PeerPredecessor *util.PeerID // predecessor peer + PeerSuccessor *util.PeerID // successor peer +} + +// helper type for signature creation/verification +type elementSignedData struct { + Size uint16 `order:"big"` // size of signed data + Purpose uint16 `order:"big"` // signature purpose (SIG_DHT_HOP) + Elem *elementData `` // path element data +} + +//---------------------------------------------------------------------- +// Element is the full-fledged data assembly for a path element in +// PUT/GET pathes. It is assembled programatically (on generation[1] and +// verification[2]) and not transferred in messages directly. +// +// [1] spe = &Element{...} +// core.Sign(spe) +// msg.putpath[i] = spe.Wire() +// +// [2] pe = &Element{...,Signature: wire.sig} +// if !pe.Verify(peerId) { ... } +// +type Element struct { + elementData + Entry +} + +// SignedData gets the data to be signed by peer ('Signable' interface) +func (pe *Element) SignedData() []byte { + sd := &elementSignedData{ + Size: 80, + Purpose: uint16(enums.SIG_DHT_HOP), + Elem: &(pe.elementData), + } + buf, err := data.Marshal(sd) + if err != nil { + logger.Println(logger.ERROR, "can't serialize path element for signature") + return nil + } + return buf +} + +// SetSignature stores the generated signature. +func (pe *Element) SetSignature(sig *util.PeerSignature) error { + pe.Signature = sig + return nil +} + +// Wire returns the path element suitable for inclusion into messages +func (pe *Element) Wire() *Entry { + return &(pe.Entry) +} + +// Verify signature for a path element. If the signature argument +// is zero, use the signature store with the element +func (pe *Element) Verify(sig *util.PeerSignature) (bool, error) { + if sig == nil { + sig = pe.Signature + if sig == nil { + return false, ErrPathNoSig + } + } + return pe.Signer.Verify(pe.SignedData(), sig) +} diff --git a/src/gnunet/service/dht/path/handling.go b/src/gnunet/service/dht/path/handling.go @@ -0,0 +1,263 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package path + +import ( + "bytes" + "encoding/hex" + "fmt" + "gnunet/crypto" + "gnunet/util" + + "github.com/bfix/gospel/data" + "github.com/bfix/gospel/logger" +) + +//---------------------------------------------------------------------- +// Path handling +//---------------------------------------------------------------------- + +// path flags +const ( + PathTruncated = iota + PathLastHop +) + +// Path is the complete list of verified hops a message travelled. +// It also keeps the associated block hash and expiration time of +// the request for signature verification purposes. +type Path struct { + Flags uint32 `order:"big"` // flags + BlkHash *crypto.HashCode `` // block hash value + Expire util.AbsoluteTime `` // expiration time + TruncOrigin *util.PeerID `opt:"(IsUsed)"` // truncated origin (optional) + NumList uint16 `order:"big"` // number of list entries + List []*Entry `size:"NumList"` // list of path entries + LastSig *util.PeerSignature `opt:"(Isused)"` // last hop signature + LastHop *util.PeerID `opt:"(IsUsed)"` // last hop peer id +} + +// IsUsed checks if an optional field is used +func (p *Path) IsUsed(field string) bool { + switch field { + case "TruncOrigin": + return p.Flags&PathTruncated != 0 + case "LastSig", "LastHop": + return p.Flags&PathLastHop != 0 + } + return false +} + +// NewPath returns a new, empty path +func NewPath(bh *crypto.HashCode, expire util.AbsoluteTime) *Path { + return &Path{ + Flags: 0, + BlkHash: bh, + Expire: expire, + TruncOrigin: nil, + NumList: 0, + List: make([]*Entry, 0), + LastSig: nil, + LastHop: nil, + } +} + +// NewPathFromBytes reconstructs a path instance from binary data. The layout +// of the data must match with the layout used in Path.Bytes(). +func NewPathFromBytes(buf []byte) (path *Path, err error) { + if buf == nil || len(buf) == 0 { + return + } + path = new(Path) + err = data.Unmarshal(&path, buf) + return +} + +// Size of the binary representation (in message) +func (p *Path) Size() uint { + var size uint + if p.TruncOrigin != nil { + size += p.TruncOrigin.Size() + } + size += uint(p.NumList) * p.List[0].Size() + if p.LastSig != nil { + size += p.LastSig.Size() + p.LastHop.Size() + } + return size +} + +// Bytes returns a binary representation +func (p *Path) Bytes() []byte { + buf, _ := data.Marshal(p) + return buf +} + +// Clone path instance +func (p *Path) Clone() *Path { + return &Path{ + Flags: p.Flags, + BlkHash: p.BlkHash, + Expire: p.Expire, + TruncOrigin: p.TruncOrigin, + NumList: p.NumList, + List: util.Clone(p.List), + LastSig: p.LastSig, + LastHop: p.LastHop, + } +} + +// NewElement creates a new path element from data +func (p *Path) NewElement(pred, signer, succ *util.PeerID) *Element { + return &Element{ + elementData: elementData{ + Expiration: p.Expire, + BlockHash: p.BlkHash, + PeerPredecessor: pred, + PeerSuccessor: succ, + }, + Entry: Entry{ + Signer: signer, + Signature: nil, + }, + } +} + +// Add new path element with signature (append to path) +func (p *Path) Add(elem *Element) { + // append path element if we have a last hop signature + if p.LastSig != nil { + e := &Entry{ + Signer: elem.PeerPredecessor, + Signature: p.LastSig, + } + p.List = append(p.List, e) + p.NumList++ + } + // update last hop signature + p.LastSig = elem.Signature + p.LastHop = elem.Signer + p.Flags |= PathLastHop +} + +// Verify path: process list entries from right to left (decreasing index). +// If an invalid signature is encountered, the path is truncated; only checked +// elements up to this point are included in the path (left trim). +// The method does not return a state; if the verification fails, the path is +// corrected (truncated or deleted) and would always verify OK. +func (p *Path) Verify(local *util.PeerID) { + + // do we have path elements? + if len(p.List) == 0 { + // no elements: last hop signature available? + if p.LastSig == nil { + // no: nothing to verify + return + } + // get predecessor (either 0 or truncated origins) + pred := util.NewPeerID(nil) + if p.TruncOrigin != nil { + pred = p.TruncOrigin + } + // check last hop signature + pe := p.NewElement(pred, p.LastHop, local) + ok, err := pe.Verify(p.LastSig) + if err != nil || !ok { + // remove last hop signature and truncated origin; reset flags + p.LastSig = nil + p.LastHop = nil + p.TruncOrigin = nil + p.Flags = 0 + } + return + } else { + // yes: process list of path elements + signer := p.LastHop + sig := p.LastSig + succ := local + num := len(p.List) + var pred *util.PeerID + for i := num - 1; i >= 0; i-- { + if i == -1 { + if p.TruncOrigin != nil { + pred = p.TruncOrigin + } else { + pred = util.NewPeerID(nil) + } + } else { + pred = p.List[i].Signer + } + pe := p.NewElement(pred, signer, succ) + ok, err := pe.Verify(sig) + if err != nil || !ok { + // we need to truncate: + logger.Printf(logger.WARN, "[path] Truncating path (invalid signature at hop %d)", i) + + // are we at the end of the list? + if i == num-1 { + // yes: the last hop signature failed -> reset path + p.LastSig = nil + p.LastHop = nil + p.TruncOrigin = nil + p.Flags = 0 + p.List = make([]*Entry, 0) + return + } + // trim list + p.Flags |= PathTruncated + p.TruncOrigin = signer + size := num - 2 - i + list := make([]*Entry, size) + if size > 0 { + copy(list, p.List[i+2:]) + } + p.List = list + return + } + // check next path element + succ = signer + signer = pred + if i != -1 { + sig = p.List[i].Signature + } + } + } +} + +// String returs a uman-readbale representation +func (p *Path) String() string { + buf := new(bytes.Buffer) + s := "0" + if p.TruncOrigin != nil { + s = p.TruncOrigin.String() + } + buf.WriteString(fmt.Sprintf("{to=%s, (%d)[", s, len(p.List))) + for _, e := range p.List { + buf.WriteString(e.String()) + } + s = "0" + if p.LastSig != nil { + s = hex.EncodeToString(p.LastSig.Bytes()) + } + num := len(s) + if num > 16 { + s = s[:8] + ".." + s[num-8:] + } + buf.WriteString(fmt.Sprintf("], ls=%s}", s)) + return buf.String() +} diff --git a/src/gnunet/service/dht/path/handling_test.go b/src/gnunet/service/dht/path/handling_test.go @@ -0,0 +1,133 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package path + +import ( + "gnunet/crypto" + "gnunet/util" + "testing" + + "github.com/bfix/gospel/crypto/ed25519" +) + +type hop struct { + peerid *util.PeerID + seckey *ed25519.PrivateKey +} + +func newHop() *hop { + h := new(hop) + var pk *ed25519.PublicKey + pk, h.seckey = ed25519.NewKeypair() + h.peerid = util.NewPeerID(pk.Bytes()) + return h +} + +func sign(sd []byte, pk *ed25519.PrivateKey) (sig *util.PeerSignature, err error) { + var s *ed25519.EdSignature + if s, err = pk.EdSign(sd); err != nil { + return + } + sig = util.NewPeerSignature(s.Bytes()) + return +} + +func GenerateTestPath(n int) (pth *Path, local *util.PeerID, err error) { + // create hops + hops := make([]*hop, n) + for i := range hops { + hops[i] = newHop() + } + // start with empty path + pth = NewPath(crypto.NewHashCode(nil), util.AbsoluteTimeNever()) + //fmt.Println("Empty path: " + pth.String()) + + // build path + pred := util.NewPeerID(nil) + for i := 0; i < n-1; i++ { + pe := pth.NewElement(pred, hops[i].peerid, hops[i+1].peerid) + if pe.Signature, err = sign(pe.SignedData(), hops[i].seckey); err != nil { + return + } + pth.Add(pe) + //fmt.Printf("[%d] %s\n", i, pth.String()) + pred = hops[i].peerid + } + local = hops[n-1].peerid + return +} + +func TestPathSimple(t *testing.T) { + + n := 10 + + pth, local, err := GenerateTestPath(n) + if err != nil { + t.Fatal(err) + } + ps1 := pth.String() + pth.Verify(local) + ps2 := pth.String() + if ps1 != ps2 { + t.Fatal("path mismatch") + } +} + +func TestPathBadElemSig(t *testing.T) { + + n := 10 + + for f := 0; ; f++ { + pth, local, err := GenerateTestPath(n) + if err != nil { + t.Fatal(err) + } + if f >= len(pth.List) { + break + } + // invalidate signature + pth.List[f].Signature = util.NewPeerSignature(nil) + pth.Verify(local) + ps3 := pth.String() + pth.Verify(local) + ps4 := pth.String() + if ps3 != ps4 { + t.Fatal("truncated path mismatch") + } + } +} + +func TestPathBadLastSig(t *testing.T) { + + n := 10 + + pth, local, err := GenerateTestPath(n) + if err != nil { + t.Fatal(err) + } + // invalidate signature + pth.LastSig = util.NewPeerSignature(nil) + pth.Verify(local) + ps3 := pth.String() + pth.Verify(local) + ps4 := pth.String() + if ps3 != ps4 { + t.Fatal("truncated path mismatch") + } +} diff --git a/src/gnunet/service/dht/resulthandler.go b/src/gnunet/service/dht/resulthandler.go @@ -67,10 +67,10 @@ type ResultHandler interface { // Compare return values //nolint:stylecheck // allow non-camel-case in constants const ( - RHC_SAME = blocks.CMP_SAME // the two result handlers are the same - RHC_MERGE = blocks.CMP_MERGE // the two result handlers can be merged - RHC_DIFFER = blocks.CMP_DIFFER // the two result handlers are different - RHC_SIBL = blocks.CMP_1 // the two result handlers are siblings + RHC_SAME = blocks.CMP_SAME // the two result handlers are the same + RHC_MERGE = blocks.CMP_MERGE // the two result handlers can be merged + RHC_DIFFER = blocks.CMP_DIFFER // the two result handlers are different + RHC_REPLACE = blocks.CMP_1 // the two result handlers are siblings ) //---------------------------------------------------------------------- @@ -119,13 +119,21 @@ func (t *GenericResultHandler) Done() bool { // Compare two handlers func (t *GenericResultHandler) Compare(h *GenericResultHandler) int { - if t.key.Equals(h.key) || + // check if base attributes differ + if !t.key.Equals(h.key) || t.btype != h.btype || t.flags != h.flags || !bytes.Equal(t.xQuery, h.xQuery) { + logger.Printf(logger.DBG, "[grh] base fields differ") return RHC_DIFFER } - return t.resFilter.Compare(h.resFilter) + // compare result filters; if they are different, replace + // the old filter with the new one + rc := t.resFilter.Compare(h.resFilter) + if rc == RHC_DIFFER { + rc = RHC_REPLACE + } + return rc } // Merge two result handlers that are the same except for result filter @@ -133,6 +141,16 @@ func (t *GenericResultHandler) Merge(a *GenericResultHandler) bool { return t.resFilter.Merge(a.resFilter) } +// Proceed return true if the message is to be processed in derived implementations +func (t *GenericResultHandler) Proceed(ctx context.Context, msg *message.DHTP2PResultMsg) bool { + block := blocks.NewGenericBlock(msg.Block) + if !t.resFilter.Contains(block) { + t.resFilter.Add(block) + return true + } + return false +} + //---------------------------------------------------------------------- // Result handler for forwarded GET requests //---------------------------------------------------------------------- @@ -159,6 +177,11 @@ func NewForwardResultHandler(msgIn message.Message, rf blocks.ResultFilter, back // Handle incoming DHT-P2P-RESULT message func (t *ForwardResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg) bool { + // don't send result if it is filtered out + if !t.Proceed(ctx, msg) { + logger.Printf(logger.DBG, "[dht-task-%d] result filtered out -- already known", t.id) + return false + } // send result message back to originator (result forwarding). logger.Printf(logger.INFO, "[dht-task-%d] sending result back to originator", t.id) if err := t.resp.Send(ctx, msg); err != nil && err != transport.ErrEndpMaybeSent { @@ -173,10 +196,12 @@ func (t *ForwardResultHandler) Compare(h ResultHandler) int { // check for correct handler type ht, ok := h.(*ForwardResultHandler) if !ok { + logger.Println(logger.DBG, "[frh] can't compare apples with oranges") return RHC_DIFFER } // check for same recipient if ht.resp.Receiver() != t.resp.Receiver() { + logger.Printf(logger.DBG, "[frh] recipients differ: %s -- %s", ht.resp.Receiver(), t.resp.Receiver()) return RHC_DIFFER } // check generic handler data @@ -238,6 +263,11 @@ func NewDirectResultHandler(msgIn message.Message, rf blocks.ResultFilter, hdlr // Handle incoming DHT-P2P-RESULT message func (t *DirectResultHandler) Handle(ctx context.Context, msg *message.DHTP2PResultMsg) bool { + // don't send result if it is filtered out + if !t.Proceed(ctx, msg) { + logger.Printf(logger.DBG, "[dht-task-%d] result filtered out -- already known", t.id) + return false + } // check for correct message type and handler function if t.hdlr != nil { logger.Printf(logger.INFO, "[dht-task-%d] handling result message", t.id) @@ -289,7 +319,7 @@ func NewResultHandlerList() *ResultHandlerList { func (t *ResultHandlerList) Add(hdlr ResultHandler) bool { // get current list of handlers for key key := hdlr.Key() - list, ok := t.list.Get(key) + list, ok := t.list.Get(key, 0) modified := false if !ok { list = make([]ResultHandler, 0) @@ -300,18 +330,23 @@ func (t *ResultHandlerList) Add(hdlr ResultHandler) bool { switch h.Compare(hdlr) { case RHC_SAME: // already in list; no need to add again + logger.Println(logger.DBG, "[rhl] SAME") return false case RHC_MERGE: // merge the two result handlers + oldMod := modified modified = h.Merge(hdlr) || modified + logger.Printf(logger.DBG, "[rhl] MERGE (%v -- %v)", oldMod, modified) break loop - case RHC_SIBL: + case RHC_REPLACE: // replace the old handler with the new one + logger.Println(logger.DBG, "[rhl] REPLACE") list[i] = hdlr modified = true break loop case RHC_DIFFER: // try next + logger.Println(logger.DBG, "[rhl] DIFFER") } } } @@ -319,18 +354,18 @@ func (t *ResultHandlerList) Add(hdlr ResultHandler) bool { // append new handler to list list = append(list, hdlr) } - t.list.Put(key, list) + t.list.Put(key, list, 0) return true } // Get handler list for given key func (t *ResultHandlerList) Get(key string) ([]ResultHandler, bool) { - return t.list.Get(key) + return t.list.Get(key, 0) } // Cleanup removes expired tasks from list func (t *ResultHandlerList) Cleanup() { - err := t.list.ProcessRange(func(key string, list []ResultHandler) error { + err := t.list.ProcessRange(func(key string, list []ResultHandler, pid int) error { var newList []ResultHandler changed := false for _, rh := range list { @@ -341,7 +376,7 @@ func (t *ResultHandlerList) Cleanup() { } } if changed { - t.list.Put(key, newList) + t.list.Put(key, newList, pid) } return nil }, false) diff --git a/src/gnunet/service/dht/routingtable.go b/src/gnunet/service/dht/routingtable.go @@ -107,7 +107,7 @@ type RoutingTable struct { buckets []*Bucket // list of buckets list *util.Map[string, *PeerAddress] // keep list of peers l2nse float64 // log2 of estimated network size - inProcess bool // flag if Process() is running + inProcess map[int]struct{} // flag if Process() is running cfg *config.RoutingConfig // routing parameters helloCache *util.Map[string, *blocks.HelloBlock] // HELLO block cache } @@ -120,7 +120,7 @@ func NewRoutingTable(ref *PeerAddress, cfg *config.RoutingConfig) *RoutingTable list: util.NewMap[string, *PeerAddress](), buckets: make([]*Bucket, 512), l2nse: -1, - inProcess: false, + inProcess: make(map[int]struct{}), cfg: cfg, helloCache: util.NewMap[string, *blocks.HelloBlock](), } @@ -137,33 +137,51 @@ func NewRoutingTable(ref *PeerAddress, cfg *config.RoutingConfig) *RoutingTable // Add new peer address to routing table. // Returns true if the entry was added, false otherwise. -func (rt *RoutingTable) Add(p *PeerAddress) bool { +func (rt *RoutingTable) Add(p *PeerAddress, label string) bool { k := p.String() - logger.Printf(logger.DBG, "[RT] Add(%s)", k) + logger.Printf(logger.DBG, "[%s] Add(%s)", label, k) // check if peer is already known - if px, ok := rt.list.Get(k); ok { - logger.Println(logger.DBG, "[RT] --> already known") + if px, ok := rt.list.Get(k, 0); ok { + logger.Printf(logger.DBG, "[%s] --> already known", label) px.lastSeen = util.AbsoluteTimeNow() return false } - // compute distance (bucket index) and insert address. _, idx := p.Distance(rt.ref) if rt.buckets[idx].Add(p) { - logger.Println(logger.DBG, "[RT] --> entry added") + logger.Printf(logger.DBG, "[%s] --> entry added", label) p.lastUsed = util.AbsoluteTimeNow() - rt.list.Put(k, p) + rt.list.Put(k, p, 0) return true } // Full bucket: we did not add the address to the routing table. - logger.Println(logger.DBG, "[RT] --> bucket full -- discarded") + logger.Printf(logger.DBG, "[%s] --> bucket[%d] full -- discarded", label, idx) return false } +// check if peer address is in routing table (=1) or if the corresponding +// k-bucket has free space (=0) or not (-1). +func (rt *RoutingTable) Check(p *PeerAddress) int { + k := p.String() + + // check if peer is already known + if px, ok := rt.list.Get(k, 0); ok { + px.lastSeen = util.AbsoluteTimeNow() + return 1 + } + // compute distance (bucket index) + _, idx := p.Distance(rt.ref) + + if rt.buckets[idx].FreeSpace() > 0 { + return 0 + } + return -1 +} + // Remove peer address from routing table. // Returns true if the entry was removed, false otherwise. -func (rt *RoutingTable) Remove(p *PeerAddress) bool { +func (rt *RoutingTable) Remove(p *PeerAddress, pid int) bool { k := p.String() logger.Printf(logger.DBG, "[RT] Remove(%s)", k) @@ -177,9 +195,9 @@ func (rt *RoutingTable) Remove(p *PeerAddress) bool { // remove from internal list logger.Println(logger.DBG, "[RT] --> entry removed from internal lists only") } - rt.list.Delete(k) + rt.list.Delete(k, 0) // delete from HELLO cache - rt.helloCache.Delete(p.Peer.String()) + rt.helloCache.Delete(p.Peer.String(), pid) return rc } @@ -189,10 +207,10 @@ func (rt *RoutingTable) Contains(p *PeerAddress) bool { logger.Printf(logger.DBG, "[RT] Contains(%s)?", k) // check for peer in internal list - px, ok := rt.list.Get(k) + px, ok := rt.list.Get(k, 0) if !ok { logger.Println(logger.DBG, "[RT] --> NOT found in current list:") - _ = rt.list.ProcessRange(func(key string, val *PeerAddress) error { + _ = rt.list.ProcessRange(func(key string, val *PeerAddress, _ int) error { logger.Printf(logger.DBG, "[RT] * %s", val) return nil }, true) @@ -206,16 +224,17 @@ func (rt *RoutingTable) Contains(p *PeerAddress) bool { //---------------------------------------------------------------------- // Process a function f in the locked context of a routing table -func (rt *RoutingTable) Process(f func() error, readonly bool) error { +func (rt *RoutingTable) Process(f func(pid int) error, readonly bool) error { // handle locking - rt.lock(readonly) - rt.inProcess = true + rt.lock(readonly, 0) + pid := util.NextID() + rt.inProcess[pid] = struct{}{} defer func() { - rt.inProcess = false - rt.unlock(readonly) + delete(rt.inProcess, pid) + rt.unlock(readonly, 0) }() // call function in unlocked context - return f() + return f(pid) } //---------------------------------------------------------------------- @@ -223,10 +242,10 @@ func (rt *RoutingTable) Process(f func() error, readonly bool) error { //---------------------------------------------------------------------- // SelectClosestPeer for a given peer address and peer filter. -func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, pf *blocks.PeerFilter) (n *PeerAddress) { +func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, pf *blocks.PeerFilter, pid int) (n *PeerAddress) { // no writer allowed - rt.RLock() - defer rt.RUnlock() + rt.lock(true, pid) + defer rt.unlock(true, pid) // find closest peer in routing table var dist *math.Int @@ -245,15 +264,15 @@ func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, pf *blocks.PeerFilter) // SelectRandomPeer returns a random address from table (that is not // included in the bloomfilter) -func (rt *RoutingTable) SelectRandomPeer(pf *blocks.PeerFilter) (p *PeerAddress) { +func (rt *RoutingTable) SelectRandomPeer(pf *blocks.PeerFilter, pid int) (p *PeerAddress) { // no writer allowed - rt.RLock() - defer rt.RUnlock() + rt.lock(true, pid) + defer rt.unlock(true, pid) // select random entry from list var ok bool for { - if _, p, ok = rt.list.GetRandom(); !ok { + if _, p, ok = rt.list.GetRandom(pid); !ok { return nil } if !pf.Contains(p.Peer) { @@ -268,19 +287,19 @@ func (rt *RoutingTable) SelectRandomPeer(pf *blocks.PeerFilter) (p *PeerAddress) // SelectPeer selects a neighbor depending on the number of hops parameter. // If hops < NSE this function MUST return SelectRandomPeer() and // SelectClosestpeer() otherwise. -func (rt *RoutingTable) SelectPeer(p *PeerAddress, hops int, bf *blocks.PeerFilter) *PeerAddress { +func (rt *RoutingTable) SelectPeer(p *PeerAddress, hops int, bf *blocks.PeerFilter, pid int) *PeerAddress { if float64(hops) < rt.l2nse { - return rt.SelectRandomPeer(bf) + return rt.SelectRandomPeer(bf, pid) } - return rt.SelectClosestPeer(p, bf) + return rt.SelectClosestPeer(p, bf, pid) } // IsClosestPeer returns true if p is the closest peer for k. Peers with a // positive test in the Bloom filter are not considered. If p is nil, our // reference address is used. -func (rt *RoutingTable) IsClosestPeer(p, k *PeerAddress, pf *blocks.PeerFilter) bool { +func (rt *RoutingTable) IsClosestPeer(p, k *PeerAddress, pf *blocks.PeerFilter, pid int) bool { // get closest peer in routing table - n := rt.SelectClosestPeer(k, pf) + n := rt.SelectClosestPeer(k, pf, pid) // check SELF? if p == nil { // if no peer in routing table found @@ -326,24 +345,22 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) { // check for dead or expired peers logger.Println(logger.DBG, "[dht] RT heartbeat...") timeout := util.NewRelativeTime(time.Duration(rt.cfg.PeerTTL) * time.Second) - if err := rt.Process(func() error { - return rt.list.ProcessRange(func(k string, p *PeerAddress) error { - // check if we can/need to drop a peer - drop := timeout.Compare(p.lastSeen.Elapsed()) < 0 - if drop || timeout.Compare(p.lastUsed.Elapsed()) < 0 { - logger.Printf(logger.DBG, "[RT] removing %v: %v, %v", p, p.lastSeen.Elapsed(), p.lastUsed.Elapsed()) - rt.Remove(p) - } - return nil - }, false) + if err := rt.list.ProcessRange(func(k string, p *PeerAddress, pid int) error { + // check if we can/need to drop a peer + drop := timeout.Compare(p.lastSeen.Elapsed()) < 0 + if drop || timeout.Compare(p.lastUsed.Elapsed()) < 0 { + logger.Printf(logger.DBG, "[RT] removing %v: %v, %v", p, p.lastSeen.Elapsed(), p.lastUsed.Elapsed()) + rt.Remove(p, pid) + } + return nil }, false); err != nil { logger.Println(logger.ERROR, "[dht] RT heartbeat failed: "+err.Error()) } // drop expired entries from the HELLO cache - _ = rt.helloCache.ProcessRange(func(key string, val *blocks.HelloBlock) error { + _ = rt.helloCache.ProcessRange(func(key string, val *blocks.HelloBlock, pid int) error { if val.Expires.Expired() { - rt.helloCache.Delete(key) + rt.helloCache.Delete(key, pid) } return nil }, false) @@ -356,7 +373,7 @@ func (rt *RoutingTable) heartbeat(ctx context.Context) { func (rt *RoutingTable) BestHello(addr *PeerAddress, rf blocks.ResultFilter) (hb *blocks.HelloBlock, dist *math.Int) { // iterate over cached HELLOs to find (best) match first - _ = rt.helloCache.ProcessRange(func(key string, val *blocks.HelloBlock) error { + _ = rt.helloCache.ProcessRange(func(key string, val *blocks.HelloBlock, _ int) error { // check if block is excluded by result filter if !rf.Contains(val) { // check for better match @@ -374,19 +391,19 @@ func (rt *RoutingTable) BestHello(addr *PeerAddress, rf blocks.ResultFilter) (hb // CacheHello adds a HELLO block to the list of cached entries. func (rt *RoutingTable) CacheHello(hb *blocks.HelloBlock) { - rt.helloCache.Put(hb.PeerID.String(), hb) + rt.helloCache.Put(hb.PeerID.String(), hb, 0) } // GetHello returns a HELLO block for key k (if available) func (rt *RoutingTable) GetHello(k string) (*blocks.HelloBlock, bool) { - return rt.helloCache.Get(k) + return rt.helloCache.Get(k, 0) } //---------------------------------------------------------------------- // lock with given mode (if not in processing function) -func (rt *RoutingTable) lock(readonly bool) { - if !rt.inProcess { +func (rt *RoutingTable) lock(readonly bool, pid int) { + if _, ok := rt.inProcess[pid]; !ok { if readonly { rt.RLock() } else { @@ -396,8 +413,8 @@ func (rt *RoutingTable) lock(readonly bool) { } // lock with given mode (if not in processing function) -func (rt *RoutingTable) unlock(readonly bool) { - if !rt.inProcess { +func (rt *RoutingTable) unlock(readonly bool, pid int) { + if _, ok := rt.inProcess[pid]; !ok { if readonly { rt.RUnlock() } else { @@ -441,6 +458,11 @@ func (b *Bucket) Add(p *PeerAddress) bool { return false } +// FreeSpace returns the number of empty slots in bucket +func (b *Bucket) FreeSpace() int { + return numK - len(b.list) +} + // Remove peer address from the bucket. // Returns true if entry is removed (found), false otherwise. func (b *Bucket) Remove(p *PeerAddress) bool { diff --git a/src/gnunet/service/dht/routingtable_test.go b/src/gnunet/service/dht/routingtable_test.go @@ -95,13 +95,13 @@ func TestRT(t *testing.T) { // actions: connected := func(task *Entry, e int64, msg string) { - rt.Add(task.addr) + rt.Add(task.addr, "test") task.online = true task.last = e t.Logf("[%6d] %s %s\n", e, task.addr, msg) } disconnected := func(task *Entry, e int64, msg string) { - rt.Remove(task.addr) + rt.Remove(task.addr, 0) task.online = false task.last = e t.Logf("[%6d] %s %s\n", e, task.addr, msg) @@ -140,10 +140,10 @@ func TestRT(t *testing.T) { // execute some routing functions on remaining table k := genRemotePeer() pf := blocks.NewPeerFilter() - n := rt.SelectClosestPeer(k, pf) + n := rt.SelectClosestPeer(k, pf, 0) t.Logf("Closest: %s -> %s\n", k, n) - n = rt.SelectRandomPeer(pf) + n = rt.SelectRandomPeer(pf, 0) t.Logf("Random: %s\n", n) } diff --git a/src/gnunet/service/module.go b/src/gnunet/service/module.go @@ -24,6 +24,7 @@ import ( "time" ) +//---------------------------------------------------------------------- // Module is an interface for GNUnet service modules (workers). // // Modules can call other GNUnet services; these services can be used by @@ -55,6 +56,7 @@ import ( // Exported and imported module function are identified by name defined in the // Export() function. Import() functions that access functions in other modules // need to use the same name for linking. +// type Module interface { // Export functions by name Export(map[string]any) @@ -69,6 +71,10 @@ type Module interface { Filter() *core.EventFilter } +//---------------------------------------------------------------------- +// Event handling +//---------------------------------------------------------------------- + // EventHandler is a function prototype for event handling type EventHandler func(context.Context, *core.Event) @@ -78,6 +84,10 @@ type Heartbeat func(context.Context) // CtxKey is a value-context key type CtxKey string +//---------------------------------------------------------------------- +// Generic module implementation +//---------------------------------------------------------------------- + // ModuleImpl is an event-handling type used by Module implementations. type ModuleImpl struct { // channel for core events. diff --git a/src/gnunet/service/namecache/module.go b/src/gnunet/service/namecache/module.go @@ -69,15 +69,15 @@ func (m *Module) Import(fcm map[string]any) { // Get an entry from the cache if available. func (m *Module) Get(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) { - var b blocks.Block - if b, err = m.cache.Get(query); err != nil { + var e *store.DHTEntry + if e, err = m.cache.Get(query); err != nil { return } - err = blocks.Unwrap(b, block) + err = blocks.Unwrap(e.Blk, block) return } // Put entry into the cache. -func (m *Module) Put(ctx context.Context, query *blocks.GNSQuery, block *blocks.GNSBlock) error { - return m.cache.Put(query, block) +func (m *Module) Put(ctx context.Context, query *blocks.GNSQuery, entry *store.DHTEntry) error { + return m.cache.Put(query, entry) } diff --git a/src/gnunet/service/store/database.go b/src/gnunet/service/store/database.go @@ -108,9 +108,9 @@ type dbPool struct { // remove a database instance from the pool based on its connect string. func (p *dbPool) remove(key string) error { - return p.insts.Process(func() (err error) { + return p.insts.Process(func(pid int) (err error) { // get pool entry - pe, ok := p.insts.Get(key) + pe, ok := p.insts.Get(key, pid) if !ok { return nil } @@ -119,7 +119,7 @@ func (p *dbPool) remove(key string) error { if pe.refs == 0 { // no more refs: close database err = pe.db.Close() - p.insts.Delete(key) + p.insts.Delete(key, pid) } return }, false) @@ -138,10 +138,10 @@ func (p *dbPool) remove(key string) error { // information required to log into the database (e.g. // "[user[:passwd]@][proto[(addr)]]/dbname[?param1=value1&...]"). func (p *dbPool) Connect(spec string) (db *DBConn, err error) { - err = p.insts.Process(func() error { + err = p.insts.Process(func(pid int) error { // check if we have a connection to this database. db = new(DBConn) - inst, ok := p.insts.Get(spec) + inst, ok := p.insts.Get(spec, pid) if !ok { inst = new(DBPoolEntry) inst.refs = 0 @@ -174,7 +174,7 @@ func (p *dbPool) Connect(spec string) (db *DBConn, err error) { return ErrSQLInvalidDatabaseSpec } // save database in pool - p.insts.Put(spec, inst) + p.insts.Put(spec, inst, pid) } // increment reference count inst.refs++ diff --git a/src/gnunet/service/store/dhtstore_test.go b/src/gnunet/service/store/dhtstore_test.go @@ -25,6 +25,7 @@ import ( "gnunet/service/dht/blocks" "gnunet/util" "math/rand" + "os" "testing" ) @@ -38,13 +39,21 @@ const ( // each block from storage and checks for matching hash. func TestDHTFilesStore(t *testing.T) { // test configuration + path := "/tmp/dht-store" + defer func() { + os.RemoveAll(path) + }() + cfg := make(util.ParameterSet) cfg["mode"] = "file" cfg["cache"] = false - cfg["path"] = "/var/lib/gnunet/dht/store" + cfg["path"] = path cfg["maxGB"] = 10 // create file store + if _, err := os.Stat(path); err != nil { + os.MkdirAll(path, 0755) + } fs, err := NewFileStore(cfg) if err != nil { t.Fatal(err) @@ -58,27 +67,30 @@ func TestDHTFilesStore(t *testing.T) { size := 1024 + rand.Intn(62000) buf := make([]byte, size) rand.Read(buf) - val := blocks.NewGenericBlock(buf) + blk := blocks.NewGenericBlock(buf) // generate associated key - k := crypto.Hash(buf).Bits + k := crypto.Hash(buf) key := blocks.NewGenericQuery(k, enums.BLOCK_TYPE_ANY, 0) - // store block + // store entry + val := &DHTEntry{ + Blk: blk, + } if err := fs.Put(key, val); err != nil { - t.Fatal(err) + t.Fatalf("[%d] %s", i, err) } // remember key keys = append(keys, key) } // Second round: retrieve blocks and check - for _, key := range keys { + for i, key := range keys { // get block val, err := fs.Get(key) if err != nil { - t.Fatal(err) + t.Fatalf("[%d] %s", i, err) } - buf := val.Data() + buf := val.Blk.Bytes() // re-create key k := crypto.Hash(buf) @@ -91,3 +103,7 @@ func TestDHTFilesStore(t *testing.T) { } } } + +func TestDHTEntryStore(t *testing.T) { + // pth, sender, local := path.GenerateTestPath(10) +} diff --git a/src/gnunet/service/store/store.go b/src/gnunet/service/store/store.go @@ -25,6 +25,7 @@ import ( "errors" "fmt" "gnunet/service/dht/blocks" + "gnunet/service/dht/path" "gnunet/util" redis "github.com/go-redis/redis/v8" @@ -70,8 +71,14 @@ type Store[K, V any] interface { // Types for custom store requirements //------------------------------------------------------------ +// DHTEntry to be stored/retrieved +type DHTEntry struct { + Blk blocks.Block + Path *path.Path +} + // DHTStore for DHT queries and blocks -type DHTStore Store[blocks.Query, blocks.Block] +type DHTStore Store[blocks.Query, *DHTEntry] // KVStore for key/value string pairs type KVStore Store[string, string] diff --git a/src/gnunet/service/store/store_fs.go b/src/gnunet/service/store/store_fs.go @@ -22,10 +22,11 @@ import ( "encoding/hex" "fmt" "gnunet/service/dht/blocks" + "gnunet/service/dht/path" "gnunet/util" - "io/ioutil" "os" + "github.com/bfix/gospel/data" "github.com/bfix/gospel/logger" "github.com/bfix/gospel/math" ) @@ -101,7 +102,7 @@ func (s *FileStore) Close() (err error) { } // Put block into storage under given key -func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { +func (s *FileStore) Put(query blocks.Query, entry *DHTEntry) (err error) { // check for free space if !s.cache { if int(s.totalSize>>30) > s.maxSpace { @@ -111,29 +112,18 @@ func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { } // get parameters btype := query.Type() - expire := block.Expire() + expire := entry.Blk.Expire() + blkSize := len(entry.Blk.Bytes()) - // get path and filename from key - path, fname := s.expandPath(query.Key().Bits) - // make sure the path exists - if err = os.MkdirAll(path, 0755); err != nil { + // write entry to file for storage + if err = s.writeEntry(query.Key().Bits, entry); err != nil { return } - // write to file for storage - var fp *os.File - bd := block.Data() - if fp, err = os.Create(path + "/" + fname); err == nil { - defer fp.Close() - // write block data - if _, err = fp.Write(bd); err != nil { - return - } - } // compile metadata now := util.AbsoluteTimeNow() meta := &FileMetadata{ key: query.Key().Bits, - size: uint64(len(bd)), + size: uint64(blkSize), btype: btype, expires: expire, stored: now, @@ -156,7 +146,7 @@ func (s *FileStore) Put(query blocks.Query, block blocks.Block) (err error) { } // Get block with given key from storage -func (s *FileStore) Get(query blocks.Query) (block blocks.Block, err error) { +func (s *FileStore) Get(query blocks.Query) (entry *DHTEntry, err error) { // check if we have metadata for the query key := query.Key().Bits btype := query.Type() @@ -173,14 +163,14 @@ func (s *FileStore) Get(query blocks.Query) (block blocks.Block, err error) { if err = s.meta.Used(key, btype); err != nil { return } - return s.readBlock(query.Key().Bits) + return s.readEntry(key) } // GetApprox returns the best-matching value with given key from storage // that is not excluded -func (s *FileStore) GetApprox(query blocks.Query, excl func(blocks.Block) bool) (block blocks.Block, key any, err error) { +func (s *FileStore) GetApprox(query blocks.Query, excl func(*DHTEntry) bool) (entry *DHTEntry, key any, err error) { var bestKey []byte - var bestBlk blocks.Block + var bestEntry *DHTEntry var bestDist *math.Int // distance function dist := func(a, b []byte) *math.Int { @@ -194,32 +184,32 @@ func (s *FileStore) GetApprox(query blocks.Query, excl func(blocks.Block) bool) check := func(md *FileMetadata) { // check for better match d := dist(md.key, query.Key().Bits) + var entry *DHTEntry if bestKey == nil || d.Cmp(bestDist) < 0 { // we might have a match. check block for exclusion - block, err = s.readBlock(md.key) - if err != nil { - logger.Printf(logger.ERROR, "[dhtstore] failed to retrieve blok for %s", hex.EncodeToString(md.key)) + if entry, err = s.readEntry(md.key); err != nil { + logger.Printf(logger.ERROR, "[dhtstore] failed to retrieve block for %s", hex.EncodeToString(md.key)) return } - if excl(block) { + if excl(entry) { return } // remember best match bestKey = md.key - bestBlk = block + bestEntry = entry bestDist = d } } if err = s.meta.Traverse(check); err != nil { return } - if bestBlk != nil { + if bestEntry != nil { // mark the block as newly used - if err = s.meta.Used(bestKey, bestBlk.Type()); err != nil { + if err = s.meta.Used(bestKey, bestEntry.Blk.Type()); err != nil { return } } - return bestBlk, bestDist, nil + return bestEntry, bestDist, nil } // Get a list of all stored block keys (generic query). @@ -227,24 +217,75 @@ func (s *FileStore) List() ([]blocks.Query, error) { return nil, ErrStoreNoList } -// read block from storage for given key -func (s *FileStore) readBlock(key []byte) (block blocks.Block, err error) { +//---------------------------------------------------------------------- + +type entryLayout struct { + SizeBlk uint16 `order:"big"` // size of block data + SizePth uint16 `order:"big"` // size of path data + Block []byte `size:"SizeBlk"` // block data + Path []byte `size:"SizePth"` // path data +} + +// read entry from storage for given key +func (s *FileStore) readEntry(key []byte) (entry *DHTEntry, err error) { // get path and filename from key - path, fname := s.expandPath(key) - // read file content (block data) + folder, fname := s.expandPath(key) + + // open file for reading + var file *os.File + if file, err = os.Open(folder + "/" + fname); err != nil { + return + } + defer file.Close() + + // get file size + fi, _ := file.Stat() + size := int(fi.Size()) + + // read data + val := new(entryLayout) + if err = data.UnmarshalStream(file, val, size); err != nil { + return + } + // assemble entry + entry = new(DHTEntry) + entry.Blk = blocks.NewGenericBlock(val.Block) + entry.Path, err = path.NewPathFromBytes(val.Path) + return +} + +// write entry to storage for given key +func (s *FileStore) writeEntry(key []byte, entry *DHTEntry) (err error) { + // get folder and filename from key + folder, fname := s.expandPath(key) + // make sure the folder exists + if err = os.MkdirAll(folder, 0755); err != nil { + return + } + // write to file content (block data) var file *os.File - if file, err = os.Open(path + "/" + fname); err != nil { + if file, err = os.Create(folder + "/" + fname); err != nil { return } defer file.Close() - // read block data - var data []byte - if data, err = ioutil.ReadAll(file); err == nil { - block = blocks.NewGenericBlock(data) + + // assemble and write entry + val := new(entryLayout) + val.Block = entry.Blk.Bytes() + val.SizeBlk = uint16(len(val.Block)) + if entry.Path != nil { + val.Path = entry.Path.Bytes() + val.SizePth = uint16(len(val.Path)) + } else { + val.Path = nil + val.SizePth = 0 } + err = data.MarshalStream(file, val) return } +//---------------------------------------------------------------------- + // expandPath returns the full path to the file for given key. func (s *FileStore) expandPath(key []byte) (string, string) { h := hex.EncodeToString(key) diff --git a/src/gnunet/service/store/store_fs_meta.go b/src/gnunet/service/store/store_fs_meta.go @@ -21,6 +21,7 @@ package store import ( "database/sql" _ "embed" + "gnunet/enums" "gnunet/util" "os" ) @@ -34,7 +35,7 @@ import ( type FileMetadata struct { key []byte // storage key size uint64 // size of file - btype uint16 // block type + btype enums.BlockType // block type stored util.AbsoluteTime // time added to store expires util.AbsoluteTime // expiration time lastUsed util.AbsoluteTime // time last used @@ -91,16 +92,18 @@ func (db *FileMetaDB) Store(md *FileMetadata) (err error) { } // Get block metadata from database -func (db *FileMetaDB) Get(key []byte, btype uint16) (md *FileMetadata, err error) { +func (db *FileMetaDB) Get(key []byte, btype enums.BlockType) (md *FileMetadata, err error) { md = new(FileMetadata) md.key = util.Clone(key) md.btype = btype stmt := "select size,stored,expires,lastUsed,usedCount from meta where qkey=? and btype=?" row := db.conn.QueryRow(stmt, key, btype) var st, exp, lu uint64 - if err = row.Scan(&md.size, &st, &exp, &lu, &md.usedCount); err == sql.ErrNoRows { - md = nil - err = nil + if err = row.Scan(&md.size, &st, &exp, &lu, &md.usedCount); err != nil { + if err == sql.ErrNoRows { + md = nil + err = nil + } } else { md.stored.Val = st * 1000000 md.expires.Val = exp * 1000000 @@ -110,13 +113,13 @@ func (db *FileMetaDB) Get(key []byte, btype uint16) (md *FileMetadata, err error } // Drop metadata for block from database -func (db *FileMetaDB) Drop(key []byte, btype uint16) error { +func (db *FileMetaDB) Drop(key []byte, btype enums.BlockType) error { _, err := db.conn.Exec("delete from meta where qkey=? and btype=?", key, btype) return err } // Used a block from store: increment usage count and lastUsed time. -func (db *FileMetaDB) Used(key []byte, btype uint16) error { +func (db *FileMetaDB) Used(key []byte, btype enums.BlockType) error { _, err := db.conn.Exec("update meta set usedCount=usedCount+1,lastUsed=unixepoch() where qkey=? and btype=?", key, btype) return err } diff --git a/src/gnunet/transport/endpoint.go b/src/gnunet/transport/endpoint.go @@ -234,14 +234,12 @@ func (ep *PaketEndpoint) CanSendTo(addr net.Addr) (ok bool) { // try to convert addr to compatible type switch ep.netw { case "udp", "udp4", "udp6": - var ua *net.UDPAddr var err error - if ua, err = net.ResolveUDPAddr(ep.netw, addr.String()); err != nil { + if _, err = net.ResolveUDPAddr(ep.netw, addr.String()); err != nil { ok = false } - logger.Printf(logger.DBG, "[pkt_ep] %s + %v -> %v (%v)", ep.netw, addr, ua, ok) default: - logger.Printf(logger.DBG, "[pkt_ep] unknown network %s", ep.netw) + logger.Printf(logger.WARN, "[pkt_ep] unknown network %s", ep.netw) ok = false } } else { @@ -309,7 +307,7 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *Message) (err erro return } session := util.NextID() - ep.conns.Put(session, conn) + ep.conns.Put(session, conn, 0) go func() { for { // read next message from connection @@ -324,7 +322,7 @@ func (ep *StreamEndpoint) Run(ctx context.Context, hdlr chan *Message) (err erro } // connection ended. conn.Close() - ep.conns.Delete(session) + ep.conns.Delete(session, 0) }() } }() diff --git a/src/gnunet/transport/reader_writer.go b/src/gnunet/transport/reader_writer.go @@ -19,10 +19,7 @@ package transport import ( - "bytes" "context" - "encoding/hex" - "encoding/json" "errors" "fmt" "gnunet/message" @@ -42,21 +39,16 @@ func WriteMessage(ctx context.Context, wrt io.WriteCloser, msg message.Message) // convert message to binary data var buf []byte if buf, err = data.Marshal(msg); err != nil { - return err + return } // check message header size and packet size mh, err := message.GetMsgHeader(buf) if err != nil { - return err + return } if len(buf) != int(mh.MsgSize) { return errors.New("WriteMessage: message size mismatch") } - // watch dog for write operation - go func() { - <-ctx.Done() - wrt.Close() - }() // perform write operation var n int if n, err = wrt.Write(buf); err != nil { @@ -120,26 +112,6 @@ func ReadMessage(ctx context.Context, rdr io.ReadCloser, buf []byte) (msg messag } //---------------------------------------------------------------------- -// Dump message -func Dump(msg message.Message, format string) string { - switch format { - case "json": - buf, err := json.Marshal(msg) - if err != nil { - return err.Error() - } - return string(buf) - case "hex": - buf := new(bytes.Buffer) - if err := WriteMessageDirect(buf, msg); err != nil { - return err.Error() - } - return hex.EncodeToString(buf.Bytes()) - } - return "unknown message dump format" -} - -//---------------------------------------------------------------------- // helper for wrapped ReadCloser/WriteCloser (close is nop) //---------------------------------------------------------------------- diff --git a/src/gnunet/transport/transport.go b/src/gnunet/transport/transport.go @@ -25,6 +25,7 @@ import ( "gnunet/message" "gnunet/util" "net" + "strings" "github.com/bfix/gospel/network" ) @@ -128,7 +129,7 @@ func (t *Transport) Shutdown() { func (t *Transport) Send(ctx context.Context, addr net.Addr, msg *Message) (err error) { // select best endpoint able to handle address var bestEp Endpoint - err = t.endpoints.ProcessRange(func(_ int, ep Endpoint) error { + err = t.endpoints.ProcessRange(func(_ int, ep Endpoint, _ int) error { if ep.CanSendTo(addr) { if bestEp == nil { bestEp = ep @@ -139,7 +140,7 @@ func (t *Transport) Send(ctx context.Context, addr net.Addr, msg *Message) (err // } } return nil - }, true) + }, false) if err != nil { return } @@ -160,7 +161,7 @@ func (t *Transport) AddEndpoint(ctx context.Context, addr *util.Address) (ep End } // check if endpoint is already available as := addr.Network() + "://" + addr.String() - if err = t.endpoints.ProcessRange(func(_ int, ep Endpoint) error { + if err = t.endpoints.ProcessRange(func(_ int, ep Endpoint, _ int) error { ae := ep.Address().Network() + "://" + ep.Address().String() if as == ae { return ErrEndpExists @@ -174,7 +175,7 @@ func (t *Transport) AddEndpoint(ctx context.Context, addr *util.Address) (ep End return } // add endpoint to list and run it - t.endpoints.Put(ep.ID(), ep) + t.endpoints.Put(ep.ID(), ep, 0) err = ep.Run(ctx, t.incoming) return } @@ -203,3 +204,19 @@ func (t *Transport) ForwardClose(id string) error { } return t.upnp.Unassign(id) } + +//---------------------------------------------------------------------- +// Helper functions +//---------------------------------------------------------------------- + +// CanHandleAddress returns true, if a given address can be handled by the +// transport framework +func CanHandleAddress(addr *util.Address) bool { + // filter out local addresses + s := addr.String() + if idx := strings.LastIndex(s, ":"); idx != -1 { + s = s[:idx] + } + ip := net.ParseIP(s) + return !(ip == nil || ip.IsLoopback()) +} diff --git a/src/gnunet/util/address.go b/src/gnunet/util/address.go @@ -117,9 +117,9 @@ func (a *PeerAddrList) Add(peer *PeerID, addr *Address) (mode int) { mode = 0 if !addr.Expires.Expired() { // run add operation - _ = a.list.Process(func() error { + _ = a.list.Process(func(pid int) error { id := peer.String() - list, ok := a.list.Get(id) + list, ok := a.list.Get(id, pid) if !ok { list = make([]*Address, 0) mode = 1 @@ -132,7 +132,7 @@ func (a *PeerAddrList) Add(peer *PeerID, addr *Address) (mode int) { mode = 2 } list = append(list, addr) - a.list.Put(id, list) + a.list.Put(id, list, pid) return nil }, false) } @@ -142,7 +142,7 @@ func (a *PeerAddrList) Add(peer *PeerID, addr *Address) (mode int) { // Get address for peer func (a *PeerAddrList) Get(peer *PeerID, transport string) (res []*Address) { id := peer.String() - list, ok := a.list.Get(id) + list, ok := a.list.Get(id, 0) if ok { for _, addr := range list { // check for expired address. @@ -163,12 +163,12 @@ func (a *PeerAddrList) Get(peer *PeerID, transport string) (res []*Address) { // Delete a list entry by key. func (a *PeerAddrList) Delete(peer *PeerID) { - a.list.Delete(peer.String()) + a.list.Delete(peer.String(), 0) } // Contains checks if a peer is contained in the list. Does not check // for expired entries. func (a *PeerAddrList) Contains(peer *PeerID) (ok bool) { - _, ok = a.list.Get(peer.String()) + _, ok = a.list.Get(peer.String(), 0) return } diff --git a/src/gnunet/util/map.go b/src/gnunet/util/map.go @@ -24,22 +24,65 @@ import ( ) //---------------------------------------------------------------------- +// ID list of active map processes: +// An active process (wrapped in a 'Process()' and 'ProcessRange()' +// call) locks (and unlocks) map access only once around a process, so +// calls to map methods from within a process are safe (no lock/unlock +// required). +//---------------------------------------------------------------------- + +// PIDList is a thread-safe list of active process IDs +type PIDList struct { + sync.RWMutex + list map[int]struct{} +} + +// NewPIDList creates a new PID list instance +func NewPIDList() *PIDList { + return &PIDList{ + list: make(map[int]struct{}), + } +} + +// Add pid to list +func (pl *PIDList) Add(pid int) { + pl.Lock() + defer pl.Unlock() + pl.list[pid] = struct{}{} +} + +// Remove pid from list +func (pl *PIDList) Remove(pid int) { + pl.Lock() + defer pl.Unlock() + delete(pl.list, pid) +} + +// Contains returns true if 'pid' is a list element +func (pl *PIDList) Contains(pid int) bool { + pl.RLock() + defer pl.RUnlock() + _, ok := pl.list[pid] + return ok +} + +//---------------------------------------------------------------------- // Thread-safe map implementation //---------------------------------------------------------------------- -// Map keys to values +// Map comparable keys to values of any type type Map[K comparable, V any] struct { sync.RWMutex list map[K]V - inProcess bool + inProcess *PIDList } // NewMap allocates a new mapping. func NewMap[K comparable, V any]() *Map[K, V] { return &Map[K, V]{ list: make(map[K]V), - inProcess: false, + inProcess: NewPIDList(), } } @@ -47,31 +90,33 @@ func NewMap[K comparable, V any]() *Map[K, V] { // Process a function in the locked map context. Calls // to other map functions in 'f' will skip their locks. -func (m *Map[K, V]) Process(f func() error, readonly bool) error { +func (m *Map[K, V]) Process(f func(pid int) error, readonly bool) error { // handle locking - m.lock(readonly) - m.inProcess = true + m.lock(readonly, 0) + pid := NextID() + m.inProcess.Add(pid) defer func() { - m.inProcess = false - m.unlock(readonly) + m.inProcess.Remove(pid) + m.unlock(readonly, 0) }() // function call in unlocked environment - return f() + return f(pid) } // Process a ranged function in the locked map context. Calls // to other map functions in 'f' will skip their locks. -func (m *Map[K, V]) ProcessRange(f func(key K, value V) error, readonly bool) error { +func (m *Map[K, V]) ProcessRange(f func(key K, value V, pid int) error, readonly bool) error { // handle locking - m.lock(readonly) - m.inProcess = true + m.lock(readonly, 0) + pid := NextID() + m.inProcess.Add(pid) defer func() { - m.inProcess = false - m.unlock(readonly) + m.inProcess.Remove(pid) + m.unlock(readonly, 0) }() // range over map and call function. for key, value := range m.list { - if err := f(key, value); err != nil { + if err := f(key, value, pid); err != nil { return err } } @@ -86,24 +131,24 @@ func (m *Map[K, V]) Size() int { } // Put value into map under given key. -func (m *Map[K, V]) Put(key K, value V) { - m.lock(false) - defer m.unlock(false) +func (m *Map[K, V]) Put(key K, value V, pid int) { + m.lock(false, pid) + defer m.unlock(false, pid) m.list[key] = value } // Get value with iven key from map. -func (m *Map[K, V]) Get(key K) (value V, ok bool) { - m.lock(true) - defer m.unlock(true) +func (m *Map[K, V]) Get(key K, pid int) (value V, ok bool) { + m.lock(true, pid) + defer m.unlock(true, pid) value, ok = m.list[key] return } // GetRandom returns a random map entry. -func (m *Map[K, V]) GetRandom() (key K, value V, ok bool) { - m.lock(true) - defer m.unlock(true) +func (m *Map[K, V]) GetRandom(pid int) (key K, value V, ok bool) { + m.lock(true, pid) + defer m.unlock(true, pid) ok = false if size := m.Size(); size > 0 { @@ -120,17 +165,17 @@ func (m *Map[K, V]) GetRandom() (key K, value V, ok bool) { } // Delete key/value pair from map. -func (m *Map[K, V]) Delete(key K) { - m.lock(false) - defer m.unlock(false) +func (m *Map[K, V]) Delete(key K, pid int) { + m.lock(false, pid) + defer m.unlock(false, pid) delete(m.list, key) } //---------------------------------------------------------------------- // lock with given mode (if not in processing function) -func (m *Map[K, V]) lock(readonly bool) { - if !m.inProcess { +func (m *Map[K, V]) lock(readonly bool, pid int) { + if !m.inProcess.Contains(pid) { if readonly { m.RLock() } else { @@ -140,8 +185,8 @@ func (m *Map[K, V]) lock(readonly bool) { } // lock with given mode (if not in processing function) -func (m *Map[K, V]) unlock(readonly bool) { - if !m.inProcess { +func (m *Map[K, V]) unlock(readonly bool, pid int) { + if !m.inProcess.Contains(pid) { if readonly { m.RUnlock() } else { diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go @@ -19,7 +19,11 @@ package util import ( + "encoding/hex" + "encoding/json" "strings" + + "github.com/bfix/gospel/data" ) //---------------------------------------------------------------------- @@ -80,3 +84,23 @@ func StripPathRight(s string) string { } return s } + +//---------------------------------------------------------------------- +// Dump instance +func Dump(obj any, format string) string { + switch format { + case "json": + buf, err := json.Marshal(obj) + if err != nil { + return err.Error() + } + return string(buf) + case "hex": + buf, err := data.Marshal(obj) + if err != nil { + return err.Error() + } + return hex.EncodeToString(buf) + } + return "unknown message dump format" +} diff --git a/src/gnunet/util/peer.go b/src/gnunet/util/peer.go @@ -20,6 +20,8 @@ package util import ( "bytes" + + "github.com/bfix/gospel/crypto/ed25519" ) //---------------------------------------------------------------------- @@ -28,34 +30,54 @@ import ( // PeerPublicKey is the binary representation of an Ed25519 public key type PeerPublicKey struct { - Data []byte `size:"32"` // Ed25519 public key data + Data []byte `size:"(Size)"` // Ed25519 public key data } // NewPeerPublicKey creates a key instance from binary data func NewPeerPublicKey(data []byte) *PeerPublicKey { - pk := &PeerPublicKey{ - Data: make([]byte, 32), - } - if data != nil { - if len(data) < 32 { - CopyAlignedBlock(pk.Data, data) + pk := new(PeerPublicKey) + size := pk.Size() + v := make([]byte, size) + if data != nil && len(data) > 0 { + if uint(len(data)) < size { + CopyAlignedBlock(v, data) } else { - copy(pk.Data, data[:32]) + copy(v, data[:size]) } } + pk.Data = v return pk } +// Size returns the length of the binary data +func (pk *PeerPublicKey) Size() uint { + return 32 +} + +// Verify peer signature +func (pk *PeerPublicKey) Verify(data []byte, sig *PeerSignature) (bool, error) { + xpk := ed25519.NewPublicKeyFromBytes(pk.Data) + xsig, err := ed25519.NewEdSignatureFromBytes(sig.Data) + if err != nil { + return false, err + } + return xpk.EdVerify(data, xsig) +} + //---------------------------------------------------------------------- // Peer identifier: //---------------------------------------------------------------------- // PeerID is a wrpped PeerPublicKey -type PeerID PeerPublicKey +type PeerID struct { + PeerPublicKey +} // NewPeerID creates a new peer id from data. func NewPeerID(data []byte) (p *PeerID) { - return (*PeerID)(NewPeerPublicKey(data)) + return &PeerID{ + PeerPublicKey: *NewPeerPublicKey(data), + } } // Equals returns true if two peer IDs match. @@ -74,21 +96,36 @@ func (p *PeerID) Bytes() []byte { } //---------------------------------------------------------------------- +// Peer signature (EdDSA signature) +//---------------------------------------------------------------------- // PeerSignature is a EdDSA signature from the peer type PeerSignature struct { - Data []byte `size:"64"` + Data []byte `size:"(Size)"` } // NewPeerSignature is a EdDSA signatre with the private peer key func NewPeerSignature(data []byte) *PeerSignature { - var v []byte - if data == nil { - v = make([]byte, 64) - } else { - v = Clone(data) - } - return &PeerSignature{ - Data: v, + s := new(PeerSignature) + size := s.Size() + v := make([]byte, size) + if data != nil && len(data) > 0 { + if uint(len(data)) < size { + CopyAlignedBlock(v, data) + } else { + copy(v, data[:size]) + } } + s.Data = v + return s +} + +// Size returns the length of the binary data +func (s *PeerSignature) Size() uint { + return 64 +} + +// Bytes returns the binary representation of a peer signature. +func (s *PeerSignature) Bytes() []byte { + return Clone(s.Data) } diff --git a/src/gnunet/util/peer_test.go b/src/gnunet/util/peer_test.go @@ -0,0 +1,38 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package util + +import ( + "encoding/base64" + "testing" + + "github.com/bfix/gospel/crypto/ed25519" +) + +func TestPeerIDString(t *testing.T) { + seed := "iYK1wSi5XtCP774eNFk1LYXqKlOPEpwKBw+2/bMkE24=" + data, err := base64.StdEncoding.DecodeString(seed) + if err != nil { + t.Fatal(err) + } + prv := ed25519.NewPrivateKeyFromSeed(data) + pub := prv.Public() + id := NewPeerID(pub.Bytes()) + t.Log(id) +}