gnunet-go

GNUnet Bindings for Go
Log | Files | Refs | README | LICENSE

commit de577428a69a0002c3194afdf0562cf5a4dc1bdc
parent 913c80f317270b41f339048afa5d63278eecf8f4
Author: Bernd Fix <brf@hoi-polloi.org>
Date:   Sat,  4 Jun 2022 11:32:09 +0200

Heartbeat handling in modules added.

Diffstat:
Msrc/gnunet/cmd/peer_mockup/main.go | 2+-
Msrc/gnunet/cmd/revoke-zonekey/main.go | 2+-
Msrc/gnunet/go.mod | 4+---
Msrc/gnunet/go.sum | 4++--
Msrc/gnunet/service/dht/module.go | 22+++++++++++++++++++---
Msrc/gnunet/service/dht/routingtable.go | 120++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------
Msrc/gnunet/service/dht/routingtable_test.go | 3++-
Msrc/gnunet/service/gns/module.go | 2+-
Msrc/gnunet/service/module.go | 23++++++++++++++++++++++-
Msrc/gnunet/service/revocation/module.go | 2+-
Msrc/gnunet/util/time.go | 29++++++++++++++++++++++++++---
11 files changed, 162 insertions(+), 51 deletions(-)

diff --git a/src/gnunet/cmd/peer_mockup/main.go b/src/gnunet/cmd/peer_mockup/main.go @@ -71,7 +71,7 @@ func main() { // handle messages coming from network module := service.NewModuleImpl() - listener := module.Run(ctx, process, nil) + listener := module.Run(ctx, process, nil, 0, nil) c.Register("mockup", listener) if !asServer { diff --git a/src/gnunet/cmd/revoke-zonekey/main.go b/src/gnunet/cmd/revoke-zonekey/main.go @@ -299,7 +299,7 @@ func main() { } } // update elapsed time - rd.T.Add(util.AbsoluteTimeNow().Diff(startTime)) + rd.T.Add(startTime.Elapsed()) rd.Last = last log.Println("Writing revocation data to file...") diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod @@ -3,7 +3,7 @@ module gnunet go 1.18 require ( - github.com/bfix/gospel v1.2.11 + github.com/bfix/gospel v1.2.14 github.com/go-redis/redis/v8 v8.11.5 github.com/go-sql-driver/mysql v1.6.0 github.com/gorilla/mux v1.8.0 @@ -21,5 +21,3 @@ require ( golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect ) - -replace github.com/bfix/gospel v1.2.11 => /vault/prj/libs/Go/Gospel diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum @@ -1,5 +1,5 @@ -github.com/bfix/gospel v1.2.11 h1:z/c6MFNq/lz4mO8+PK60a3NvH+lbTKAlLCShuFFZUvg= -github.com/bfix/gospel v1.2.11/go.mod h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI= +github.com/bfix/gospel v1.2.14 h1:lIdagJvkebG+uYbVdfK6XbT1udnq/ezd/Gi54EaMtV0= +github.com/bfix/gospel v1.2.14/go.mod h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go @@ -26,6 +26,7 @@ import ( "gnunet/service" "gnunet/service/dht/blocks" "net/http" + "time" ) //====================================================================== @@ -72,7 +73,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module) { rtable: rt, } // register as listener for core events - listener := m.Run(ctx, m.event, m.Filter()) + listener := m.Run(ctx, m.event, m.Filter(), 15*time.Minute, m.heartbeat) c.Register("dht", listener) return @@ -119,9 +120,24 @@ func (m *Module) Filter() *core.EventFilter { return f } -// Event handler -func (nc *Module) event(ctx context.Context, ev *core.Event) { +// Event handler for infrastructure signals +func (m *Module) event(ctx context.Context, ev *core.Event) { + switch ev.ID { + // New peer connected: + case core.EV_CONNECT: + // Add peer to routing table + } + +} + +// Heartbeat handler for periodic tasks +func (m *Module) heartbeat(ctx context.Context) { + // update the estimated network size + m.rtable.l2nse = m.core.L2NSE() + + // run heartbeat for routing table + m.rtable.heartbeat(ctx) } //---------------------------------------------------------------------- diff --git a/src/gnunet/service/dht/routingtable.go b/src/gnunet/service/dht/routingtable.go @@ -20,12 +20,15 @@ package dht import ( "bytes" + "context" "crypto/sha512" "encoding/hex" "gnunet/util" "math/rand" "sync" + "time" + "github.com/bfix/gospel/logger" "github.com/bfix/gospel/math" ) @@ -48,7 +51,10 @@ const ( // PeerAddress is the identifier for a peer in the DHT network. // It is the SHA-512 hash of the PeerID (public Ed25519 key). type PeerAddress struct { - addr [sizeAddr]byte + addr [sizeAddr]byte // hash value as bytes + connected bool // is peer connected? + lastSeen util.AbsoluteTime // time the peer was last seen + lastUsed util.AbsoluteTime // time the peer was last used } // NewPeerAddress returns the DHT address of a peer. @@ -57,6 +63,8 @@ func NewPeerAddress(peer *util.PeerID) *PeerAddress { h := rtHash() h.Write(peer.Key) copy(r.addr[:], h.Sum(nil)) + r.lastSeen = util.AbsoluteTimeNow() + r.lastUsed = util.AbsoluteTimeNow() return r } @@ -90,36 +98,51 @@ func (addr *PeerAddress) Distance(p *PeerAddress) (*math.Int, int) { // distance to the reference address, so smaller index means // "nearer" to the reference address. type RoutingTable struct { - ref *PeerAddress // reference address for distance - buckets []*Bucket // list of buckets - list map[*PeerAddress]bool // keep list of peers - rwlock sync.RWMutex // lock for write operations - l2nse float64 // log2 of estimated network size + ref *PeerAddress // reference address for distance + buckets []*Bucket // list of buckets + list map[*PeerAddress]struct{} // keep list of peers + rwlock sync.RWMutex // lock for write operations + l2nse float64 // log2 of estimated network size + inProcess bool // flag if Process() is running } // NewRoutingTable creates a new routing table for the reference address. func NewRoutingTable(ref *PeerAddress) *RoutingTable { - rt := new(RoutingTable) - rt.ref = ref - rt.list = make(map[*PeerAddress]bool) - rt.buckets = make([]*Bucket, numBuckets) + // create routing table + rt := &RoutingTable{ + ref: ref, + list: make(map[*PeerAddress]struct{}), + buckets: make([]*Bucket, numBuckets), + l2nse: 0., + inProcess: false, + } + // fill buckets for i := range rt.buckets { rt.buckets[i] = NewBucket(numK) } return rt } +//---------------------------------------------------------------------- +// Peer management +//---------------------------------------------------------------------- + // Add new peer address to routing table. // Returns true if the entry was added, false otherwise. -func (rt *RoutingTable) Add(p *PeerAddress, connected bool) bool { +func (rt *RoutingTable) Add(p *PeerAddress) bool { // ensure one write and no readers rt.rwlock.Lock() defer rt.rwlock.Unlock() + // check if peer is already known + if _, ok := rt.list[p]; ok { + return false + } + // compute distance (bucket index) and insert address. _, idx := p.Distance(rt.ref) - if rt.buckets[idx].Add(p, connected) { - rt.list[p] = true + if rt.buckets[idx].Add(p) { + rt.list[p] = struct{}{} return true } // Full bucket: we did not add the address to the routing table. @@ -139,11 +162,23 @@ func (rt *RoutingTable) Remove(p *PeerAddress) bool { delete(rt.list, p) return true } + // remove from internal list + delete(rt.list, p) return false } //---------------------------------------------------------------------- -// routing functions + +// Process a function f in the locked context of a routing table +func (rt *RoutingTable) Process(f func() error) error { + // ensure one write and no readers + rt.rwlock.Lock() + defer rt.rwlock.Unlock() + return f() +} + +//---------------------------------------------------------------------- +// Routing functions //---------------------------------------------------------------------- // SelectClosestPeer for a given peer address and bloomfilter. @@ -160,6 +195,8 @@ func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) ( n = k } } + // mark peer as used + n.lastUsed = util.AbsoluteTimeNow() return } @@ -175,6 +212,8 @@ func (rt *RoutingTable) SelectRandomPeer(bf *PeerBloomFilter) *PeerAddress { idx := rand.Intn(size) for k := range rt.list { if idx == 0 { + // mark peer as used + k.lastUsed = util.AbsoluteTimeNow() return k } idx-- @@ -221,33 +260,50 @@ func (rt *RoutingTable) ComputeOutDegree(repl, hop int) int { return 1 + int(rm1/(rt.l2nse+rm1*hf)) } +//---------------------------------------------------------------------- + +// Heartbeat handler for periodic tasks +func (rt *RoutingTable) heartbeat(ctx context.Context) { + + // check for dead or expired peers + timeout := util.NewRelativeTime(3 * time.Hour) + if err := rt.Process(func() error { + for addr := range rt.list { + if addr.connected { + continue + } + // check if we can/need to drop a peer + drop := timeout.Compare(addr.lastSeen.Elapsed()) < 0 + if drop || timeout.Compare(addr.lastUsed.Elapsed()) < 0 { + rt.Remove(addr) + } + } + return nil + }); err != nil { + logger.Println(logger.ERROR, "[dht] RT heartbeat: "+err.Error()) + } +} + //====================================================================== // Routing table buckets //====================================================================== -// PeerEntry in a k-Bucket: use routing specific attributes -// for book-keeping -type PeerEntry struct { - addr *PeerAddress // peer address - connected bool // is peer connected? -} - // Bucket holds peer entries with approx. same distance from node type Bucket struct { - list []*PeerEntry + list []*PeerAddress rwlock sync.RWMutex } // NewBucket creates a new entry list of given size func NewBucket(n int) *Bucket { return &Bucket{ - list: make([]*PeerEntry, 0, n), + list: make([]*PeerAddress, 0, n), } } // Add peer address to the bucket if there is free space. // Returns true if entry is added, false otherwise. -func (b *Bucket) Add(p *PeerAddress, connected bool) bool { +func (b *Bucket) Add(p *PeerAddress) bool { // only one writer and no readers b.rwlock.Lock() defer b.rwlock.Unlock() @@ -255,11 +311,7 @@ func (b *Bucket) Add(p *PeerAddress, connected bool) bool { // check for free space in bucket if len(b.list) < numK { // append entry at the end - pe := &PeerEntry{ - addr: p, - connected: connected, - } - b.list = append(b.list, pe) + b.list = append(b.list, p) return true } return false @@ -273,7 +325,7 @@ func (b *Bucket) Remove(p *PeerAddress) bool { defer b.rwlock.Unlock() for i, pe := range b.list { - if pe.addr.Equals(p) { + if pe.Equals(p) { // found entry: remove it b.list = append(b.list[:i], b.list[i+1:]...) return true @@ -289,16 +341,16 @@ func (b *Bucket) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) (n *Peer b.rwlock.RLock() defer b.rwlock.RUnlock() - for _, pe := range b.list { + for _, addr := range b.list { // skip addresses in bloomfilter - if bf.Contains(pe.addr) { + if bf.Contains(addr) { continue } // check for shorter distance - if d, _ := p.Distance(pe.addr); n == nil || d.Cmp(dist) < 0 { + if d, _ := p.Distance(addr); n == nil || d.Cmp(dist) < 0 { // remember best match dist = d - n = pe.addr + n = addr } } return diff --git a/src/gnunet/service/dht/routingtable_test.go b/src/gnunet/service/dht/routingtable_test.go @@ -87,7 +87,8 @@ func TestRT(t *testing.T) { // actions: connected := func(task *Entry, e int64, msg string) { - rt.Add(task.addr, true) + task.addr.connected = true + rt.Add(task.addr) task.online = true task.last = e t.Logf("[%6d] %s %s\n", e, task.addr, msg) diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go @@ -103,7 +103,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module) { ModuleImpl: *service.NewModuleImpl(), } // register as listener for core events - listener := m.Run(ctx, m.event, m.Filter()) + listener := m.Run(ctx, m.event, m.Filter(), 0, nil) c.Register("gns", listener) return diff --git a/src/gnunet/service/module.go b/src/gnunet/service/module.go @@ -22,6 +22,7 @@ import ( "context" "gnunet/core" "net/http" + "time" ) // Module is an interface for GNUnet service modules (workers). @@ -36,6 +37,9 @@ type Module interface { // EventHandler is a function prototype for event handling type EventHandler func(context.Context, *core.Event) +// Heartbeat is a function prototype for periodic tasks +type Heartbeat func(context.Context) + // ModuleImpl is an event-handling type used by Module implementations. type ModuleImpl struct { ch chan *core.Event // channel for core events. @@ -49,9 +53,19 @@ func NewModuleImpl() (m *ModuleImpl) { } // Run event handling loop -func (m *ModuleImpl) Run(ctx context.Context, hdlr EventHandler, filter *core.EventFilter) (listener *core.Listener) { +func (m *ModuleImpl) Run( + ctx context.Context, + hdlr EventHandler, filter *core.EventFilter, + pulse time.Duration, heartbeat Heartbeat, +) (listener *core.Listener) { // listener for registration listener = core.NewListener(m.ch, filter) + + // if no heartbeat handler is defined, set pulse to near flatline. + if heartbeat == nil { + pulse = 365 * 24 * time.Hour // once a year + } + tick := time.Tick(pulse) // run event loop go func() { for { @@ -63,6 +77,13 @@ func (m *ModuleImpl) Run(ctx context.Context, hdlr EventHandler, filter *core.Ev // wait for terminate signal case <-ctx.Done(): return + + // handle heartbeat + case <-tick: + // check for defined heartbeat handler + if heartbeat != nil { + heartbeat(ctx) + } } } }() diff --git a/src/gnunet/service/revocation/module.go b/src/gnunet/service/revocation/module.go @@ -74,7 +74,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module) { return nil } // register as listener for core events - listener := m.Run(ctx, m.event, m.Filter()) + listener := m.Run(ctx, m.event, m.Filter(), 0, nil) c.Register("gns", listener) return m } diff --git a/src/gnunet/util/time.go b/src/gnunet/util/time.go @@ -81,16 +81,28 @@ func (t AbsoluteTime) Add(d time.Duration) AbsoluteTime { } } +// Elapsed time since 't'. Return 0 if 't' is in the future. +func (t AbsoluteTime) Elapsed() RelativeTime { + dt, elapsed := t.Diff(AbsoluteTimeNow()) + if !elapsed { + dt = NewRelativeTime(0) + } + return dt +} + // Diff returns the relative time between two absolute times; -// the ordering of the absolute times doesn't matter. -func (t AbsoluteTime) Diff(t2 AbsoluteTime) RelativeTime { +// returns true if t2 is after t1. +func (t AbsoluteTime) Diff(t2 AbsoluteTime) (dt RelativeTime, elapsed bool) { var d uint64 if t.Compare(t2) == 1 { d = t.Val - t2.Val + elapsed = false } else { d = t2.Val - t.Val + elapsed = true } - return RelativeTime{d} + dt = RelativeTime{d} + return } // Expired returns true if the timestamp is in the past. @@ -150,3 +162,14 @@ func (t RelativeTime) String() string { func (t RelativeTime) Add(t2 RelativeTime) { t.Val += t2.Val } + +// Compare two durations +func (t RelativeTime) Compare(t2 RelativeTime) int { + switch { + case t.Val < t2.Val: + return -1 + case t.Val > t2.Val: + return 1 + } + return 0 +}