diff options
author | Bernd Fix <brf@hoi-polloi.org> | 2022-06-04 11:32:09 +0200 |
---|---|---|
committer | Bernd Fix <brf@hoi-polloi.org> | 2022-06-04 11:32:09 +0200 |
commit | de577428a69a0002c3194afdf0562cf5a4dc1bdc (patch) | |
tree | 254c07d5b583f0d1288394bf333cd7cd4a049ff4 /src | |
parent | 913c80f317270b41f339048afa5d63278eecf8f4 (diff) | |
download | gnunet-go-de577428a69a0002c3194afdf0562cf5a4dc1bdc.tar.gz gnunet-go-de577428a69a0002c3194afdf0562cf5a4dc1bdc.zip |
Heartbeat handling in modules added.
Diffstat (limited to 'src')
-rw-r--r-- | src/gnunet/cmd/peer_mockup/main.go | 2 | ||||
-rw-r--r-- | src/gnunet/cmd/revoke-zonekey/main.go | 2 | ||||
-rw-r--r-- | src/gnunet/go.mod | 4 | ||||
-rw-r--r-- | src/gnunet/go.sum | 4 | ||||
-rw-r--r-- | src/gnunet/service/dht/module.go | 22 | ||||
-rw-r--r-- | src/gnunet/service/dht/routingtable.go | 120 | ||||
-rw-r--r-- | src/gnunet/service/dht/routingtable_test.go | 3 | ||||
-rw-r--r-- | src/gnunet/service/gns/module.go | 2 | ||||
-rw-r--r-- | src/gnunet/service/module.go | 23 | ||||
-rw-r--r-- | src/gnunet/service/revocation/module.go | 2 | ||||
-rw-r--r-- | src/gnunet/util/time.go | 29 |
11 files changed, 162 insertions, 51 deletions
diff --git a/src/gnunet/cmd/peer_mockup/main.go b/src/gnunet/cmd/peer_mockup/main.go index 4288fb1..58f4baf 100644 --- a/src/gnunet/cmd/peer_mockup/main.go +++ b/src/gnunet/cmd/peer_mockup/main.go | |||
@@ -71,7 +71,7 @@ func main() { | |||
71 | 71 | ||
72 | // handle messages coming from network | 72 | // handle messages coming from network |
73 | module := service.NewModuleImpl() | 73 | module := service.NewModuleImpl() |
74 | listener := module.Run(ctx, process, nil) | 74 | listener := module.Run(ctx, process, nil, 0, nil) |
75 | c.Register("mockup", listener) | 75 | c.Register("mockup", listener) |
76 | 76 | ||
77 | if !asServer { | 77 | if !asServer { |
diff --git a/src/gnunet/cmd/revoke-zonekey/main.go b/src/gnunet/cmd/revoke-zonekey/main.go index 298a7e4..aab9602 100644 --- a/src/gnunet/cmd/revoke-zonekey/main.go +++ b/src/gnunet/cmd/revoke-zonekey/main.go | |||
@@ -299,7 +299,7 @@ func main() { | |||
299 | } | 299 | } |
300 | } | 300 | } |
301 | // update elapsed time | 301 | // update elapsed time |
302 | rd.T.Add(util.AbsoluteTimeNow().Diff(startTime)) | 302 | rd.T.Add(startTime.Elapsed()) |
303 | rd.Last = last | 303 | rd.Last = last |
304 | 304 | ||
305 | log.Println("Writing revocation data to file...") | 305 | log.Println("Writing revocation data to file...") |
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod index 7383eaf..ad203ca 100644 --- a/src/gnunet/go.mod +++ b/src/gnunet/go.mod | |||
@@ -3,7 +3,7 @@ module gnunet | |||
3 | go 1.18 | 3 | go 1.18 |
4 | 4 | ||
5 | require ( | 5 | require ( |
6 | github.com/bfix/gospel v1.2.11 | 6 | github.com/bfix/gospel v1.2.14 |
7 | github.com/go-redis/redis/v8 v8.11.5 | 7 | github.com/go-redis/redis/v8 v8.11.5 |
8 | github.com/go-sql-driver/mysql v1.6.0 | 8 | github.com/go-sql-driver/mysql v1.6.0 |
9 | github.com/gorilla/mux v1.8.0 | 9 | github.com/gorilla/mux v1.8.0 |
@@ -21,5 +21,3 @@ require ( | |||
21 | golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect | 21 | golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect |
22 | golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect | 22 | golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect |
23 | ) | 23 | ) |
24 | |||
25 | replace github.com/bfix/gospel v1.2.11 => /vault/prj/libs/Go/Gospel | ||
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum index ea3c328..f2baf8e 100644 --- a/src/gnunet/go.sum +++ b/src/gnunet/go.sum | |||
@@ -1,5 +1,5 @@ | |||
1 | github.com/bfix/gospel v1.2.11 h1:z/c6MFNq/lz4mO8+PK60a3NvH+lbTKAlLCShuFFZUvg= | 1 | github.com/bfix/gospel v1.2.14 h1:lIdagJvkebG+uYbVdfK6XbT1udnq/ezd/Gi54EaMtV0= |
2 | github.com/bfix/gospel v1.2.11/go.mod h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI= | 2 | github.com/bfix/gospel v1.2.14/go.mod h1:cdu63bA9ZdfeDoqZ+vnWOcbY9Puwdzmf5DMxMGMznRI= |
3 | github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= | 3 | github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= |
4 | github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= | 4 | github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= |
5 | github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= | 5 | github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= |
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go index d369f3f..5f04d54 100644 --- a/src/gnunet/service/dht/module.go +++ b/src/gnunet/service/dht/module.go | |||
@@ -26,6 +26,7 @@ import ( | |||
26 | "gnunet/service" | 26 | "gnunet/service" |
27 | "gnunet/service/dht/blocks" | 27 | "gnunet/service/dht/blocks" |
28 | "net/http" | 28 | "net/http" |
29 | "time" | ||
29 | ) | 30 | ) |
30 | 31 | ||
31 | //====================================================================== | 32 | //====================================================================== |
@@ -72,7 +73,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module) { | |||
72 | rtable: rt, | 73 | rtable: rt, |
73 | } | 74 | } |
74 | // register as listener for core events | 75 | // register as listener for core events |
75 | listener := m.Run(ctx, m.event, m.Filter()) | 76 | listener := m.Run(ctx, m.event, m.Filter(), 15*time.Minute, m.heartbeat) |
76 | c.Register("dht", listener) | 77 | c.Register("dht", listener) |
77 | 78 | ||
78 | return | 79 | return |
@@ -119,9 +120,24 @@ func (m *Module) Filter() *core.EventFilter { | |||
119 | return f | 120 | return f |
120 | } | 121 | } |
121 | 122 | ||
122 | // Event handler | 123 | // Event handler for infrastructure signals |
123 | func (nc *Module) event(ctx context.Context, ev *core.Event) { | 124 | func (m *Module) event(ctx context.Context, ev *core.Event) { |
125 | switch ev.ID { | ||
126 | // New peer connected: | ||
127 | case core.EV_CONNECT: | ||
128 | // Add peer to routing table | ||
124 | 129 | ||
130 | } | ||
131 | |||
132 | } | ||
133 | |||
134 | // Heartbeat handler for periodic tasks | ||
135 | func (m *Module) heartbeat(ctx context.Context) { | ||
136 | // update the estimated network size | ||
137 | m.rtable.l2nse = m.core.L2NSE() | ||
138 | |||
139 | // run heartbeat for routing table | ||
140 | m.rtable.heartbeat(ctx) | ||
125 | } | 141 | } |
126 | 142 | ||
127 | //---------------------------------------------------------------------- | 143 | //---------------------------------------------------------------------- |
diff --git a/src/gnunet/service/dht/routingtable.go b/src/gnunet/service/dht/routingtable.go index 895a1b2..0078b71 100644 --- a/src/gnunet/service/dht/routingtable.go +++ b/src/gnunet/service/dht/routingtable.go | |||
@@ -20,12 +20,15 @@ package dht | |||
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "bytes" | 22 | "bytes" |
23 | "context" | ||
23 | "crypto/sha512" | 24 | "crypto/sha512" |
24 | "encoding/hex" | 25 | "encoding/hex" |
25 | "gnunet/util" | 26 | "gnunet/util" |
26 | "math/rand" | 27 | "math/rand" |
27 | "sync" | 28 | "sync" |
29 | "time" | ||
28 | 30 | ||
31 | "github.com/bfix/gospel/logger" | ||
29 | "github.com/bfix/gospel/math" | 32 | "github.com/bfix/gospel/math" |
30 | ) | 33 | ) |
31 | 34 | ||
@@ -48,7 +51,10 @@ const ( | |||
48 | // PeerAddress is the identifier for a peer in the DHT network. | 51 | // PeerAddress is the identifier for a peer in the DHT network. |
49 | // It is the SHA-512 hash of the PeerID (public Ed25519 key). | 52 | // It is the SHA-512 hash of the PeerID (public Ed25519 key). |
50 | type PeerAddress struct { | 53 | type PeerAddress struct { |
51 | addr [sizeAddr]byte | 54 | addr [sizeAddr]byte // hash value as bytes |
55 | connected bool // is peer connected? | ||
56 | lastSeen util.AbsoluteTime // time the peer was last seen | ||
57 | lastUsed util.AbsoluteTime // time the peer was last used | ||
52 | } | 58 | } |
53 | 59 | ||
54 | // NewPeerAddress returns the DHT address of a peer. | 60 | // NewPeerAddress returns the DHT address of a peer. |
@@ -57,6 +63,8 @@ func NewPeerAddress(peer *util.PeerID) *PeerAddress { | |||
57 | h := rtHash() | 63 | h := rtHash() |
58 | h.Write(peer.Key) | 64 | h.Write(peer.Key) |
59 | copy(r.addr[:], h.Sum(nil)) | 65 | copy(r.addr[:], h.Sum(nil)) |
66 | r.lastSeen = util.AbsoluteTimeNow() | ||
67 | r.lastUsed = util.AbsoluteTimeNow() | ||
60 | return r | 68 | return r |
61 | } | 69 | } |
62 | 70 | ||
@@ -90,36 +98,51 @@ func (addr *PeerAddress) Distance(p *PeerAddress) (*math.Int, int) { | |||
90 | // distance to the reference address, so smaller index means | 98 | // distance to the reference address, so smaller index means |
91 | // "nearer" to the reference address. | 99 | // "nearer" to the reference address. |
92 | type RoutingTable struct { | 100 | type RoutingTable struct { |
93 | ref *PeerAddress // reference address for distance | 101 | ref *PeerAddress // reference address for distance |
94 | buckets []*Bucket // list of buckets | 102 | buckets []*Bucket // list of buckets |
95 | list map[*PeerAddress]bool // keep list of peers | 103 | list map[*PeerAddress]struct{} // keep list of peers |
96 | rwlock sync.RWMutex // lock for write operations | 104 | rwlock sync.RWMutex // lock for write operations |
97 | l2nse float64 // log2 of estimated network size | 105 | l2nse float64 // log2 of estimated network size |
106 | inProcess bool // flag if Process() is running | ||
98 | } | 107 | } |
99 | 108 | ||
100 | // NewRoutingTable creates a new routing table for the reference address. | 109 | // NewRoutingTable creates a new routing table for the reference address. |
101 | func NewRoutingTable(ref *PeerAddress) *RoutingTable { | 110 | func NewRoutingTable(ref *PeerAddress) *RoutingTable { |
102 | rt := new(RoutingTable) | 111 | // create routing table |
103 | rt.ref = ref | 112 | rt := &RoutingTable{ |
104 | rt.list = make(map[*PeerAddress]bool) | 113 | ref: ref, |
105 | rt.buckets = make([]*Bucket, numBuckets) | 114 | list: make(map[*PeerAddress]struct{}), |
115 | buckets: make([]*Bucket, numBuckets), | ||
116 | l2nse: 0., | ||
117 | inProcess: false, | ||
118 | } | ||
119 | // fill buckets | ||
106 | for i := range rt.buckets { | 120 | for i := range rt.buckets { |
107 | rt.buckets[i] = NewBucket(numK) | 121 | rt.buckets[i] = NewBucket(numK) |
108 | } | 122 | } |
109 | return rt | 123 | return rt |
110 | } | 124 | } |
111 | 125 | ||
126 | //---------------------------------------------------------------------- | ||
127 | // Peer management | ||
128 | //---------------------------------------------------------------------- | ||
129 | |||
112 | // Add new peer address to routing table. | 130 | // Add new peer address to routing table. |
113 | // Returns true if the entry was added, false otherwise. | 131 | // Returns true if the entry was added, false otherwise. |
114 | func (rt *RoutingTable) Add(p *PeerAddress, connected bool) bool { | 132 | func (rt *RoutingTable) Add(p *PeerAddress) bool { |
115 | // ensure one write and no readers | 133 | // ensure one write and no readers |
116 | rt.rwlock.Lock() | 134 | rt.rwlock.Lock() |
117 | defer rt.rwlock.Unlock() | 135 | defer rt.rwlock.Unlock() |
118 | 136 | ||
137 | // check if peer is already known | ||
138 | if _, ok := rt.list[p]; ok { | ||
139 | return false | ||
140 | } | ||
141 | |||
119 | // compute distance (bucket index) and insert address. | 142 | // compute distance (bucket index) and insert address. |
120 | _, idx := p.Distance(rt.ref) | 143 | _, idx := p.Distance(rt.ref) |
121 | if rt.buckets[idx].Add(p, connected) { | 144 | if rt.buckets[idx].Add(p) { |
122 | rt.list[p] = true | 145 | rt.list[p] = struct{}{} |
123 | return true | 146 | return true |
124 | } | 147 | } |
125 | // Full bucket: we did not add the address to the routing table. | 148 | // Full bucket: we did not add the address to the routing table. |
@@ -139,11 +162,23 @@ func (rt *RoutingTable) Remove(p *PeerAddress) bool { | |||
139 | delete(rt.list, p) | 162 | delete(rt.list, p) |
140 | return true | 163 | return true |
141 | } | 164 | } |
165 | // remove from internal list | ||
166 | delete(rt.list, p) | ||
142 | return false | 167 | return false |
143 | } | 168 | } |
144 | 169 | ||
145 | //---------------------------------------------------------------------- | 170 | //---------------------------------------------------------------------- |
146 | // routing functions | 171 | |
172 | // Process a function f in the locked context of a routing table | ||
173 | func (rt *RoutingTable) Process(f func() error) error { | ||
174 | // ensure one write and no readers | ||
175 | rt.rwlock.Lock() | ||
176 | defer rt.rwlock.Unlock() | ||
177 | return f() | ||
178 | } | ||
179 | |||
180 | //---------------------------------------------------------------------- | ||
181 | // Routing functions | ||
147 | //---------------------------------------------------------------------- | 182 | //---------------------------------------------------------------------- |
148 | 183 | ||
149 | // SelectClosestPeer for a given peer address and bloomfilter. | 184 | // SelectClosestPeer for a given peer address and bloomfilter. |
@@ -160,6 +195,8 @@ func (rt *RoutingTable) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) ( | |||
160 | n = k | 195 | n = k |
161 | } | 196 | } |
162 | } | 197 | } |
198 | // mark peer as used | ||
199 | n.lastUsed = util.AbsoluteTimeNow() | ||
163 | return | 200 | return |
164 | } | 201 | } |
165 | 202 | ||
@@ -175,6 +212,8 @@ func (rt *RoutingTable) SelectRandomPeer(bf *PeerBloomFilter) *PeerAddress { | |||
175 | idx := rand.Intn(size) | 212 | idx := rand.Intn(size) |
176 | for k := range rt.list { | 213 | for k := range rt.list { |
177 | if idx == 0 { | 214 | if idx == 0 { |
215 | // mark peer as used | ||
216 | k.lastUsed = util.AbsoluteTimeNow() | ||
178 | return k | 217 | return k |
179 | } | 218 | } |
180 | idx-- | 219 | idx-- |
@@ -221,33 +260,50 @@ func (rt *RoutingTable) ComputeOutDegree(repl, hop int) int { | |||
221 | return 1 + int(rm1/(rt.l2nse+rm1*hf)) | 260 | return 1 + int(rm1/(rt.l2nse+rm1*hf)) |
222 | } | 261 | } |
223 | 262 | ||
263 | //---------------------------------------------------------------------- | ||
264 | |||
265 | // Heartbeat handler for periodic tasks | ||
266 | func (rt *RoutingTable) heartbeat(ctx context.Context) { | ||
267 | |||
268 | // check for dead or expired peers | ||
269 | timeout := util.NewRelativeTime(3 * time.Hour) | ||
270 | if err := rt.Process(func() error { | ||
271 | for addr := range rt.list { | ||
272 | if addr.connected { | ||
273 | continue | ||
274 | } | ||
275 | // check if we can/need to drop a peer | ||
276 | drop := timeout.Compare(addr.lastSeen.Elapsed()) < 0 | ||
277 | if drop || timeout.Compare(addr.lastUsed.Elapsed()) < 0 { | ||
278 | rt.Remove(addr) | ||
279 | } | ||
280 | } | ||
281 | return nil | ||
282 | }); err != nil { | ||
283 | logger.Println(logger.ERROR, "[dht] RT heartbeat: "+err.Error()) | ||
284 | } | ||
285 | } | ||
286 | |||
224 | //====================================================================== | 287 | //====================================================================== |
225 | // Routing table buckets | 288 | // Routing table buckets |
226 | //====================================================================== | 289 | //====================================================================== |
227 | 290 | ||
228 | // PeerEntry in a k-Bucket: use routing specific attributes | ||
229 | // for book-keeping | ||
230 | type PeerEntry struct { | ||
231 | addr *PeerAddress // peer address | ||
232 | connected bool // is peer connected? | ||
233 | } | ||
234 | |||
235 | // Bucket holds peer entries with approx. same distance from node | 291 | // Bucket holds peer entries with approx. same distance from node |
236 | type Bucket struct { | 292 | type Bucket struct { |
237 | list []*PeerEntry | 293 | list []*PeerAddress |
238 | rwlock sync.RWMutex | 294 | rwlock sync.RWMutex |
239 | } | 295 | } |
240 | 296 | ||
241 | // NewBucket creates a new entry list of given size | 297 | // NewBucket creates a new entry list of given size |
242 | func NewBucket(n int) *Bucket { | 298 | func NewBucket(n int) *Bucket { |
243 | return &Bucket{ | 299 | return &Bucket{ |
244 | list: make([]*PeerEntry, 0, n), | 300 | list: make([]*PeerAddress, 0, n), |
245 | } | 301 | } |
246 | } | 302 | } |
247 | 303 | ||
248 | // Add peer address to the bucket if there is free space. | 304 | // Add peer address to the bucket if there is free space. |
249 | // Returns true if entry is added, false otherwise. | 305 | // Returns true if entry is added, false otherwise. |
250 | func (b *Bucket) Add(p *PeerAddress, connected bool) bool { | 306 | func (b *Bucket) Add(p *PeerAddress) bool { |
251 | // only one writer and no readers | 307 | // only one writer and no readers |
252 | b.rwlock.Lock() | 308 | b.rwlock.Lock() |
253 | defer b.rwlock.Unlock() | 309 | defer b.rwlock.Unlock() |
@@ -255,11 +311,7 @@ func (b *Bucket) Add(p *PeerAddress, connected bool) bool { | |||
255 | // check for free space in bucket | 311 | // check for free space in bucket |
256 | if len(b.list) < numK { | 312 | if len(b.list) < numK { |
257 | // append entry at the end | 313 | // append entry at the end |
258 | pe := &PeerEntry{ | 314 | b.list = append(b.list, p) |
259 | addr: p, | ||
260 | connected: connected, | ||
261 | } | ||
262 | b.list = append(b.list, pe) | ||
263 | return true | 315 | return true |
264 | } | 316 | } |
265 | return false | 317 | return false |
@@ -273,7 +325,7 @@ func (b *Bucket) Remove(p *PeerAddress) bool { | |||
273 | defer b.rwlock.Unlock() | 325 | defer b.rwlock.Unlock() |
274 | 326 | ||
275 | for i, pe := range b.list { | 327 | for i, pe := range b.list { |
276 | if pe.addr.Equals(p) { | 328 | if pe.Equals(p) { |
277 | // found entry: remove it | 329 | // found entry: remove it |
278 | b.list = append(b.list[:i], b.list[i+1:]...) | 330 | b.list = append(b.list[:i], b.list[i+1:]...) |
279 | return true | 331 | return true |
@@ -289,16 +341,16 @@ func (b *Bucket) SelectClosestPeer(p *PeerAddress, bf *PeerBloomFilter) (n *Peer | |||
289 | b.rwlock.RLock() | 341 | b.rwlock.RLock() |
290 | defer b.rwlock.RUnlock() | 342 | defer b.rwlock.RUnlock() |
291 | 343 | ||
292 | for _, pe := range b.list { | 344 | for _, addr := range b.list { |
293 | // skip addresses in bloomfilter | 345 | // skip addresses in bloomfilter |
294 | if bf.Contains(pe.addr) { | 346 | if bf.Contains(addr) { |
295 | continue | 347 | continue |
296 | } | 348 | } |
297 | // check for shorter distance | 349 | // check for shorter distance |
298 | if d, _ := p.Distance(pe.addr); n == nil || d.Cmp(dist) < 0 { | 350 | if d, _ := p.Distance(addr); n == nil || d.Cmp(dist) < 0 { |
299 | // remember best match | 351 | // remember best match |
300 | dist = d | 352 | dist = d |
301 | n = pe.addr | 353 | n = addr |
302 | } | 354 | } |
303 | } | 355 | } |
304 | return | 356 | return |
diff --git a/src/gnunet/service/dht/routingtable_test.go b/src/gnunet/service/dht/routingtable_test.go index 2579356..659f9d4 100644 --- a/src/gnunet/service/dht/routingtable_test.go +++ b/src/gnunet/service/dht/routingtable_test.go | |||
@@ -87,7 +87,8 @@ func TestRT(t *testing.T) { | |||
87 | 87 | ||
88 | // actions: | 88 | // actions: |
89 | connected := func(task *Entry, e int64, msg string) { | 89 | connected := func(task *Entry, e int64, msg string) { |
90 | rt.Add(task.addr, true) | 90 | task.addr.connected = true |
91 | rt.Add(task.addr) | ||
91 | task.online = true | 92 | task.online = true |
92 | task.last = e | 93 | task.last = e |
93 | t.Logf("[%6d] %s %s\n", e, task.addr, msg) | 94 | t.Logf("[%6d] %s %s\n", e, task.addr, msg) |
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go index 31ad6c7..4878aa0 100644 --- a/src/gnunet/service/gns/module.go +++ b/src/gnunet/service/gns/module.go | |||
@@ -103,7 +103,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module) { | |||
103 | ModuleImpl: *service.NewModuleImpl(), | 103 | ModuleImpl: *service.NewModuleImpl(), |
104 | } | 104 | } |
105 | // register as listener for core events | 105 | // register as listener for core events |
106 | listener := m.Run(ctx, m.event, m.Filter()) | 106 | listener := m.Run(ctx, m.event, m.Filter(), 0, nil) |
107 | c.Register("gns", listener) | 107 | c.Register("gns", listener) |
108 | 108 | ||
109 | return | 109 | return |
diff --git a/src/gnunet/service/module.go b/src/gnunet/service/module.go index 98307d6..5f95975 100644 --- a/src/gnunet/service/module.go +++ b/src/gnunet/service/module.go | |||
@@ -22,6 +22,7 @@ import ( | |||
22 | "context" | 22 | "context" |
23 | "gnunet/core" | 23 | "gnunet/core" |
24 | "net/http" | 24 | "net/http" |
25 | "time" | ||
25 | ) | 26 | ) |
26 | 27 | ||
27 | // Module is an interface for GNUnet service modules (workers). | 28 | // Module is an interface for GNUnet service modules (workers). |
@@ -36,6 +37,9 @@ type Module interface { | |||
36 | // EventHandler is a function prototype for event handling | 37 | // EventHandler is a function prototype for event handling |
37 | type EventHandler func(context.Context, *core.Event) | 38 | type EventHandler func(context.Context, *core.Event) |
38 | 39 | ||
40 | // Heartbeat is a function prototype for periodic tasks | ||
41 | type Heartbeat func(context.Context) | ||
42 | |||
39 | // ModuleImpl is an event-handling type used by Module implementations. | 43 | // ModuleImpl is an event-handling type used by Module implementations. |
40 | type ModuleImpl struct { | 44 | type ModuleImpl struct { |
41 | ch chan *core.Event // channel for core events. | 45 | ch chan *core.Event // channel for core events. |
@@ -49,9 +53,19 @@ func NewModuleImpl() (m *ModuleImpl) { | |||
49 | } | 53 | } |
50 | 54 | ||
51 | // Run event handling loop | 55 | // Run event handling loop |
52 | func (m *ModuleImpl) Run(ctx context.Context, hdlr EventHandler, filter *core.EventFilter) (listener *core.Listener) { | 56 | func (m *ModuleImpl) Run( |
57 | ctx context.Context, | ||
58 | hdlr EventHandler, filter *core.EventFilter, | ||
59 | pulse time.Duration, heartbeat Heartbeat, | ||
60 | ) (listener *core.Listener) { | ||
53 | // listener for registration | 61 | // listener for registration |
54 | listener = core.NewListener(m.ch, filter) | 62 | listener = core.NewListener(m.ch, filter) |
63 | |||
64 | // if no heartbeat handler is defined, set pulse to near flatline. | ||
65 | if heartbeat == nil { | ||
66 | pulse = 365 * 24 * time.Hour // once a year | ||
67 | } | ||
68 | tick := time.Tick(pulse) | ||
55 | // run event loop | 69 | // run event loop |
56 | go func() { | 70 | go func() { |
57 | for { | 71 | for { |
@@ -63,6 +77,13 @@ func (m *ModuleImpl) Run(ctx context.Context, hdlr EventHandler, filter *core.Ev | |||
63 | // wait for terminate signal | 77 | // wait for terminate signal |
64 | case <-ctx.Done(): | 78 | case <-ctx.Done(): |
65 | return | 79 | return |
80 | |||
81 | // handle heartbeat | ||
82 | case <-tick: | ||
83 | // check for defined heartbeat handler | ||
84 | if heartbeat != nil { | ||
85 | heartbeat(ctx) | ||
86 | } | ||
66 | } | 87 | } |
67 | } | 88 | } |
68 | }() | 89 | }() |
diff --git a/src/gnunet/service/revocation/module.go b/src/gnunet/service/revocation/module.go index 37b57ab..1f0ab48 100644 --- a/src/gnunet/service/revocation/module.go +++ b/src/gnunet/service/revocation/module.go | |||
@@ -74,7 +74,7 @@ func NewModule(ctx context.Context, c *core.Core) (m *Module) { | |||
74 | return nil | 74 | return nil |
75 | } | 75 | } |
76 | // register as listener for core events | 76 | // register as listener for core events |
77 | listener := m.Run(ctx, m.event, m.Filter()) | 77 | listener := m.Run(ctx, m.event, m.Filter(), 0, nil) |
78 | c.Register("gns", listener) | 78 | c.Register("gns", listener) |
79 | return m | 79 | return m |
80 | } | 80 | } |
diff --git a/src/gnunet/util/time.go b/src/gnunet/util/time.go index 70b91e1..53589b8 100644 --- a/src/gnunet/util/time.go +++ b/src/gnunet/util/time.go | |||
@@ -81,16 +81,28 @@ func (t AbsoluteTime) Add(d time.Duration) AbsoluteTime { | |||
81 | } | 81 | } |
82 | } | 82 | } |
83 | 83 | ||
84 | // Elapsed time since 't'. Return 0 if 't' is in the future. | ||
85 | func (t AbsoluteTime) Elapsed() RelativeTime { | ||
86 | dt, elapsed := t.Diff(AbsoluteTimeNow()) | ||
87 | if !elapsed { | ||
88 | dt = NewRelativeTime(0) | ||
89 | } | ||
90 | return dt | ||
91 | } | ||
92 | |||
84 | // Diff returns the relative time between two absolute times; | 93 | // Diff returns the relative time between two absolute times; |
85 | // the ordering of the absolute times doesn't matter. | 94 | // returns true if t2 is after t1. |
86 | func (t AbsoluteTime) Diff(t2 AbsoluteTime) RelativeTime { | 95 | func (t AbsoluteTime) Diff(t2 AbsoluteTime) (dt RelativeTime, elapsed bool) { |
87 | var d uint64 | 96 | var d uint64 |
88 | if t.Compare(t2) == 1 { | 97 | if t.Compare(t2) == 1 { |
89 | d = t.Val - t2.Val | 98 | d = t.Val - t2.Val |
99 | elapsed = false | ||
90 | } else { | 100 | } else { |
91 | d = t2.Val - t.Val | 101 | d = t2.Val - t.Val |
102 | elapsed = true | ||
92 | } | 103 | } |
93 | return RelativeTime{d} | 104 | dt = RelativeTime{d} |
105 | return | ||
94 | } | 106 | } |
95 | 107 | ||
96 | // Expired returns true if the timestamp is in the past. | 108 | // Expired returns true if the timestamp is in the past. |
@@ -150,3 +162,14 @@ func (t RelativeTime) String() string { | |||
150 | func (t RelativeTime) Add(t2 RelativeTime) { | 162 | func (t RelativeTime) Add(t2 RelativeTime) { |
151 | t.Val += t2.Val | 163 | t.Val += t2.Val |
152 | } | 164 | } |
165 | |||
166 | // Compare two durations | ||
167 | func (t RelativeTime) Compare(t2 RelativeTime) int { | ||
168 | switch { | ||
169 | case t.Val < t2.Val: | ||
170 | return -1 | ||
171 | case t.Val > t2.Val: | ||
172 | return 1 | ||
173 | } | ||
174 | return 0 | ||
175 | } | ||