gnunet-go

GNUnet Bindings for Go
Log | Files | Refs | README | LICENSE

commit 2a2a558aa553aca3394513114d13d4888ae822db
parent be8e465cd061612133282db80ab1c7b7387ee251
Author: Bernd Fix <brf@hoi-polloi.org>
Date:   Tue, 31 Mar 2020 13:12:30 +0200

Handle DHT lookup aborts.

Diffstat:
Msrc/cmd/gnunet-service-gns-go/main.go | 2++
Msrc/cmd/peer_mockup/process.go | 26++++++++++++++++----------
Msrc/gnunet/go.mod | 4++--
Msrc/gnunet/go.sum | 5+++++
Msrc/gnunet/message/factory.go | 2++
Msrc/gnunet/message/msg_dht.go | 37+++++++++++++++++++++++++++++++++++++
Msrc/gnunet/modules.go | 11+++++++++--
Msrc/gnunet/service/client.go | 35++++++++++++++++++++++++-----------
Asrc/gnunet/service/context.go | 68++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/gnunet/service/dht/module.go | 49+++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/gnunet/service/gns/dns.go | 13++++++++++---
Msrc/gnunet/service/gns/module.go | 109+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------
Msrc/gnunet/service/gns/service.go | 71+++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------
Msrc/gnunet/service/namecache/module.go | 5+++--
Msrc/gnunet/service/service.go | 52+++++++++++++++++++++++++++++++++++++++++-----------
Msrc/gnunet/transport/channel.go | 32++++++++++++++++++++++++--------
Msrc/gnunet/transport/channel_netw.go | 92+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
Msrc/gnunet/transport/connection.go | 10++++++----
Msrc/gnunet/util/misc.go | 1+
Msrc/gnunet/util/time.go | 19+++++++++++++++++++
20 files changed, 547 insertions(+), 96 deletions(-)

diff --git a/src/cmd/gnunet-service-gns-go/main.go b/src/cmd/gnunet-service-gns-go/main.go @@ -89,6 +89,8 @@ loop: break loop case syscall.SIGHUP: logger.Println(logger.INFO, "[gns] SIGHUP") + case syscall.SIGURG: + // TODO: https://github.com/golang/go/issues/37942 default: logger.Println(logger.INFO, "[gns] Unhandled signal: "+sig.String()) } diff --git a/src/cmd/peer_mockup/process.go b/src/cmd/peer_mockup/process.go @@ -9,6 +9,12 @@ import ( "gnunet/message" "gnunet/transport" "gnunet/util" + + "github.com/bfix/gospel/concurrent" +) + +var ( + sig = concurrent.NewSignaller() ) func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { @@ -20,7 +26,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { in := make(chan message.Message) go func() { for { - msg, err := c.Receive() + msg, err := c.Receive(sig) if err != nil { fmt.Printf("Receive: %s\n", err.Error()) return @@ -33,7 +39,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { init := (from == p) if init { peerid := util.NewPeerID(p.GetID()) - c.Send(message.NewTransportTcpWelcomeMsg(peerid)) + c.Send(message.NewTransportTcpWelcomeMsg(peerid), sig) } // remember peer addresses (only ONE!) @@ -53,11 +59,11 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { case *message.TransportTcpWelcomeMsg: peerid := util.NewPeerID(p.GetID()) if init { - c.Send(message.NewHelloMsg(peerid)) + c.Send(message.NewHelloMsg(peerid), sig) target := util.NewPeerID(t.GetID()) - c.Send(message.NewTransportPingMsg(target, tAddr)) + c.Send(message.NewTransportPingMsg(target, tAddr), sig) } else { - c.Send(message.NewTransportTcpWelcomeMsg(peerid)) + c.Send(message.NewTransportTcpWelcomeMsg(peerid), sig) } case *message.HelloMsg: @@ -67,7 +73,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { if err := mOut.Sign(p.PrvKey()); err != nil { return err } - c.Send(mOut) + c.Send(mOut, sig) case *message.TransportPongMsg: rc, err := msg.Verify(t.PubKey()) @@ -79,14 +85,14 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { } send[message.TRANSPORT_PONG] = true if mOut, ok := pending[message.TRANSPORT_SESSION_SYN]; ok { - c.Send(mOut) + c.Send(mOut, sig) } case *message.SessionSynMsg: mOut := message.NewSessionSynAckMsg() mOut.Timestamp = msg.Timestamp if send[message.TRANSPORT_PONG] { - c.Send(mOut) + c.Send(mOut, sig) } else { pending[message.TRANSPORT_SESSION_SYN] = mOut } @@ -97,7 +103,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { case *message.SessionAckMsg: case *message.SessionKeepAliveMsg: - c.Send(message.NewSessionKeepAliveRespMsg(msg.Nonce)) + c.Send(message.NewSessionKeepAliveRespMsg(msg.Nonce), sig) case *message.EphemeralKeyMsg: rc, err := msg.Verify(t.PubKey()) @@ -108,7 +114,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { return errors.New("EPHKEY verification failed") } t.SetEphKeyMsg(msg) - c.Send(p.EphKeyMsg()) + c.Send(p.EphKeyMsg(), sig) secret := crypto.SharedSecret(p.EphPrvKey(), t.EphKeyMsg().Public()) c.SharedSecret(util.Clone(secret.Bits[:])) diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod @@ -3,7 +3,7 @@ module gnunet go 1.12 require ( - github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449 + github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46 github.com/miekg/dns v1.1.26 - golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 + golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 ) diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum @@ -1,5 +1,7 @@ github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449 h1:oIq3s14sMh1sq791v9VpR+GJvhVGbvuOWlfTjruRTDQ= github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449/go.mod h1:RQYETFV9SP+VriIsHVqCntRpSbbRvCBnNTtbUl9NAKA= +github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46 h1:5aNd1/ISbO1ltgmyUGza7kdaN4fD/Qal6uKZk9goMhw= +github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46/go.mod h1:RQYETFV9SP+VriIsHVqCntRpSbbRvCBnNTtbUl9NAKA= github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -7,6 +9,8 @@ golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472 h1:Gv7RPwsi3eZ2Fgewe3CBsu golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M= golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= @@ -19,6 +23,7 @@ golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe h1:6fAMxZRR6sl1Uq8U61gxU+kPTs2tR8uOySCbBP7BN/M= golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= diff --git a/src/gnunet/message/factory.go b/src/gnunet/message/factory.go @@ -59,6 +59,8 @@ func NewEmptyMessage(msgType uint16) (Message, error) { //------------------------------------------------------------------ case DHT_CLIENT_GET: return NewDHTClientGetMsg(nil), nil + case DHT_CLIENT_GET_STOP: + return NewDHTClientGetStopMsg(nil), nil case DHT_CLIENT_RESULT: return NewDHTClientResultMsg(nil), nil diff --git a/src/gnunet/message/msg_dht.go b/src/gnunet/message/msg_dht.go @@ -126,3 +126,40 @@ func (m *DHTClientResultMsg) String() string { func (msg *DHTClientResultMsg) Header() *MessageHeader { return &MessageHeader{msg.MsgSize, msg.MsgType} } + +//---------------------------------------------------------------------- +// DHT_CLIENT_GET_STOP +//---------------------------------------------------------------------- + +// DHTClientGetStopMsg +type DHTClientGetStopMsg struct { + MsgSize uint16 `order:"big"` // total size of message + MsgType uint16 `order:"big"` // DHT_CLIENT_GET_STOP (144) + Reserved uint32 `order:"big"` // Reserved (0) + Id uint64 `order:"big"` // Unique ID identifying this request + Key *crypto.HashCode // The key to search for +} + +// NewDHTClientGetStopMsg creates a new default DHTClientGetStopMsg object. +func NewDHTClientGetStopMsg(key *crypto.HashCode) *DHTClientGetStopMsg { + if key == nil { + key = new(crypto.HashCode) + } + return &DHTClientGetStopMsg{ + MsgSize: 80, + MsgType: DHT_CLIENT_GET_STOP, + Reserved: 0, // mandatory + Id: 0, + Key: key, + } +} + +// String returns a human-readable representation of the message. +func (m *DHTClientGetStopMsg) String() string { + return fmt.Sprintf("DHTClientGetStopMsg{Id:%d,Key=%s}", m.Id, hex.EncodeToString(m.Key.Bits)) +} + +// Header returns the message header in a separate instance. +func (msg *DHTClientGetStopMsg) Header() *MessageHeader { + return &MessageHeader{msg.MsgSize, msg.MsgType} +} diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go @@ -29,6 +29,7 @@ package gnunet import ( + "gnunet/service/dht" "gnunet/service/gns" "gnunet/service/namecache" ) @@ -37,6 +38,7 @@ import ( type Instances struct { GNS *gns.GNSModule Namecache *namecache.NamecacheModule + DHT *dht.DHTModule } // Local reference to instance list @@ -50,9 +52,14 @@ func init() { // Namecache (no calls to other modules) Modules.Namecache = new(namecache.NamecacheModule) + // DHT (no calls to other modules) + Modules.DHT = new(dht.DHTModule) + // GNS (calls Namecache, DHT and Identity) Modules.GNS = &gns.GNSModule{ - LookupLocal: Modules.Namecache.Get, - StoreLocal: Modules.Namecache.Put, + LookupLocal: Modules.Namecache.Get, + StoreLocal: Modules.Namecache.Put, + LookupRemote: Modules.DHT.Get, + CancelRemote: Modules.DHT.Cancel, } } diff --git a/src/gnunet/service/client.go b/src/gnunet/service/client.go @@ -25,36 +25,49 @@ import ( "github.com/bfix/gospel/logger" ) -// Client +// Client type: Use to perform client-side interactions with GNUnet services. type Client struct { - ch *transport.MsgChannel + ch *transport.MsgChannel // channel for message exchange } -// NewClient +// NewClient creates a new client instance for the given channel endpoint. func NewClient(endp string) (*Client, error) { - // + // create a new channel to endpoint. ch, err := transport.NewChannel(endp) if err != nil { return nil, err } + // wrap into a message channel for the client. return &Client{ ch: transport.NewMsgChannel(ch), }, nil } -func (c *Client) SendRequest(req message.Message) error { - return c.ch.Send(req) +// SendRequest sends a give message to the service. +func (c *Client) SendRequest(ctx *SessionContext, req message.Message) error { + return c.ch.Send(req, ctx.Signaller()) } -func (c *Client) ReceiveResponse() (message.Message, error) { - return c.ch.Receive() +// ReceiveResponse waits for a response from the service; it can be interrupted +// by sending "false" to the cmd channel. +func (c *Client) ReceiveResponse(ctx *SessionContext) (message.Message, error) { + return c.ch.Receive(ctx.Signaller()) } +// Close a client; no further message exchange is possible. func (c *Client) Close() error { return c.ch.Close() } -func ServiceRequestResponse(caller, callee, endp string, req message.Message) (message.Message, error) { +// ServiceRequestResponse is a helper method for a one request - one response +// secenarios of client/serice interactions. +func ServiceRequestResponse( + ctx *SessionContext, + caller string, + callee string, + endp string, + req message.Message) (message.Message, error) { + // client-connect to the service logger.Printf(logger.DBG, "[%s] Connect to %s service\n", caller, callee) cl, err := NewClient(endp) @@ -63,13 +76,13 @@ func ServiceRequestResponse(caller, callee, endp string, req message.Message) (m } // send request logger.Printf(logger.DBG, "[%s] Sending request to %s service\n", caller, callee) - if err = cl.SendRequest(req); err != nil { + if err = cl.SendRequest(ctx, req); err != nil { return nil, err } // wait for a single response, then close the connection logger.Printf(logger.DBG, "[%s] Waiting for response from %s service\n", caller, callee) var resp message.Message - if resp, err = cl.ReceiveResponse(); err != nil { + if resp, err = cl.ReceiveResponse(ctx); err != nil { return nil, err } logger.Printf(logger.DBG, "[%s] Closing connection to %s service\n", caller, callee) diff --git a/src/gnunet/service/context.go b/src/gnunet/service/context.go @@ -0,0 +1,68 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019, 2020 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package service + +import ( + "sync" + + "gnunet/util" + + "github.com/bfix/gospel/concurrent" +) + +// SessionContext is used to set a context for each client connection handled +// by a service; the session is handled by the 'ServeClient' method of a +// service implementation. +type SessionContext struct { + Id int // session identifier + wg *sync.WaitGroup // wait group for the session + sig *concurrent.Signaller // signaller for the session +} + +// NewSessionContext instantiates a new session context. +func NewSessionContext() *SessionContext { + return &SessionContext{ + Id: util.NextID(), + wg: new(sync.WaitGroup), + sig: concurrent.NewSignaller(), + } +} + +// Cancel all go-routines associated with this context. +func (ctx *SessionContext) Cancel() { + // send signal to terminate... + ctx.sig.Send(true) + // wait for session go-routines to finish + ctx.wg.Wait() +} + +// Add a go-routine to the wait group. +func (ctx *SessionContext) Add() { + ctx.wg.Add(1) +} + +// Remove a go-routine from the wait group. +func (ctx *SessionContext) Remove() { + ctx.wg.Done() +} + +// Signaller returns the working instance for the context. +func (ctx *SessionContext) Signaller() *concurrent.Signaller { + return ctx.sig +} diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go @@ -0,0 +1,49 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019, 2020 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package dht + +import ( + "gnunet/message" + "gnunet/service" + "gnunet/service/gns" +) + +//====================================================================== +// "DHT" implementation +//====================================================================== + +//---------------------------------------------------------------------- +// Put and get blocks into/from a DHT. +//---------------------------------------------------------------------- + +// DHT handles the permanent storage of blocks under the query key. +type DHTModule struct { +} + +func (nc *DHTModule) Get(ctx *service.SessionContext, query *gns.Query) (*message.GNSBlock, error) { + return nil, nil +} + +func (nc *DHTModule) Cancel(ctx *service.SessionContext, query *gns.Query) error { + return nil +} + +func (nc *DHTModule) Put(ctx *service.SessionContext, block *message.GNSBlock) error { + return nil +} diff --git a/src/gnunet/service/gns/dns.go b/src/gnunet/service/gns/dns.go @@ -26,6 +26,7 @@ import ( "gnunet/enums" "gnunet/message" + "gnunet/service" "gnunet/util" "github.com/bfix/gospel/crypto/ed25519" @@ -202,10 +203,16 @@ func QueryDNS(id int, name string, server net.IP, kind RRTypeList) *message.GNSR // ResolveDNS resolves a name in DNS. Multiple DNS servers are queried in // parallel; the first result delivered by any of the servers is returned // as the result list of matching resource records. -func (gns *GNSModule) ResolveDNS(name string, servers []string, kind RRTypeList, pkey *ed25519.PublicKey, depth int) (set *message.GNSRecordSet, err error) { - logger.Printf(logger.DBG, "[dns] Resolution of '%s' starting...\n", name) +func (gns *GNSModule) ResolveDNS( + ctx *service.SessionContext, + name string, + servers []string, + kind RRTypeList, + pkey *ed25519.PublicKey, + depth int) (set *message.GNSRecordSet, err error) { // start DNS queries concurrently + logger.Printf(logger.DBG, "[dns] Resolution of '%s' starting...\n", name) res := make(chan *message.GNSRecordSet) running := 0 for _, srv := range servers { @@ -215,7 +222,7 @@ func (gns *GNSModule) ResolveDNS(name string, servers []string, kind RRTypeList, if addr == nil { // no, it is a name... try to resolve an IP address from the name query := NewRRTypeList(enums.GNS_TYPE_DNS_A, enums.GNS_TYPE_DNS_AAAA) - if set, err = gns.ResolveUnknown(srv, nil, pkey, query, depth+1); err != nil { + if set, err = gns.ResolveUnknown(ctx, srv, nil, pkey, query, depth+1); err != nil { logger.Printf(logger.ERROR, "[dns] Can't resolve NS server '%s': %s\n", srv, err.Error()) continue } diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go @@ -26,6 +26,8 @@ import ( "gnunet/crypto" "gnunet/enums" "gnunet/message" + "gnunet/service" + "gnunet/transport" "gnunet/util" "github.com/bfix/gospel/crypto/ed25519" @@ -110,15 +112,22 @@ func NewQuery(pkey *ed25519.PublicKey, label string) *Query { // GNSModule handles the resolution of GNS names to RRs bundled in a block. type GNSModule struct { // Use function references for calls to methods in other modules: - // - LookupLocal func(query *Query) (*message.GNSBlock, error) - StoreLocal func(block *message.GNSBlock) error - LookupRemote func(query *Query) (*message.GNSBlock, error) + LookupLocal func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error) + StoreLocal func(ctx *service.SessionContext, block *message.GNSBlock) error + LookupRemote func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error) + CancelRemote func(ctx *service.SessionContext, query *Query) error } // Resolve a GNS name with multiple labels. If pkey is not nil, the name // is interpreted as "relative to current zone". -func (gns *GNSModule) Resolve(path string, pkey *ed25519.PublicKey, kind RRTypeList, mode int, depth int) (set *message.GNSRecordSet, err error) { +func (gns *GNSModule) Resolve( + ctx *service.SessionContext, + path string, + pkey *ed25519.PublicKey, + kind RRTypeList, + mode int, + depth int) (set *message.GNSRecordSet, err error) { + // check for recursion depth if depth > config.Cfg.GNS.MaxDepth { return nil, ErrGNSRecursionExceeded @@ -130,14 +139,20 @@ func (gns *GNSModule) Resolve(path string, pkey *ed25519.PublicKey, kind RRTypeL // check for relative path if pkey != nil { //resolve relative path - return gns.ResolveRelative(names, pkey, kind, mode, depth) + return gns.ResolveRelative(ctx, names, pkey, kind, mode, depth) } // resolve absolute path - return gns.ResolveAbsolute(names, kind, mode, depth) + return gns.ResolveAbsolute(ctx, names, kind, mode, depth) } // Resolve a fully qualified GNS absolute name (with multiple labels). -func (gns *GNSModule) ResolveAbsolute(labels []string, kind RRTypeList, mode int, depth int) (set *message.GNSRecordSet, err error) { +func (gns *GNSModule) ResolveAbsolute( + ctx *service.SessionContext, + labels []string, + kind RRTypeList, + mode int, + depth int) (set *message.GNSRecordSet, err error) { + // get the zone key for the TLD pkey := gns.GetZoneKey(labels[0]) if pkey == nil { @@ -146,12 +161,19 @@ func (gns *GNSModule) ResolveAbsolute(labels []string, kind RRTypeList, mode int return } // continue with resolution relative to a zone. - return gns.ResolveRelative(labels[1:], pkey, kind, mode, depth) + return gns.ResolveRelative(ctx, labels[1:], pkey, kind, mode, depth) } // Resolve relative path (to a given zone) recursively by processing simple // (PKEY,Label) lookups in sequence and handle intermediate GNS record types -func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey, kind RRTypeList, mode int, depth int) (set *message.GNSRecordSet, err error) { +func (gns *GNSModule) ResolveRelative( + ctx *service.SessionContext, + labels []string, + pkey *ed25519.PublicKey, + kind RRTypeList, + mode int, + depth int) (set *message.GNSRecordSet, err error) { + // Process all names in sequence var ( records []*message.GNSResourceRecord // final resource records from resolution @@ -162,7 +184,7 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey, // resolve next level var block *message.GNSBlock - if block, err = gns.Lookup(pkey, labels[0], mode); err != nil { + if block, err = gns.Lookup(ctx, pkey, labels[0], mode); err != nil { // failed to resolve name return } @@ -225,10 +247,23 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey, lbls += "." } fqdn := lbls + inst.Query - if set, err = gns.ResolveDNS(fqdn, inst.Servers, kind, pkey, depth); err != nil { + if set, err = gns.ResolveDNS(ctx, fqdn, inst.Servers, kind, pkey, depth); err != nil { logger.Println(logger.ERROR, "[gns] GNS2DNS resolution failed.") return } + // add synthetic LEHO record if we have results and are at the + // end of the name (labels). + if len(set.Records) > 0 && len(labels) == 1 { + // add LEHO supplemental record: The TTL of the new record is + // the longest-living record in the current set. + expires := util.AbsoluteTimeNow() + for _, rec := range set.Records { + if rec.Expires.Compare(expires) > 0 { + expires = rec.Expires + } + } + set.Records = append(set.Records, gns.newLEHORecord(inst.Query, expires)) + } // we are done with resolution; pass on records to caller records = set.Records break @@ -250,7 +285,7 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey, break } logger.Println(logger.DBG, "[gns] CNAME resolution required.") - if set, err = gns.ResolveUnknown(inst.name, labels, pkey, kind, depth+1); err != nil { + if set, err = gns.ResolveUnknown(ctx, inst.name, labels, pkey, kind, depth+1); err != nil { logger.Println(logger.ERROR, "[gns] CNAME resolution failed.") return } @@ -300,7 +335,14 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey, // relative to the zone PKEY. If the name is an absolute GNS name (ending in // a PKEY TLD), it is also resolved with GNS. All other names are resolved // via DNS queries. -func (gns *GNSModule) ResolveUnknown(name string, labels []string, pkey *ed25519.PublicKey, kind RRTypeList, depth int) (set *message.GNSRecordSet, err error) { +func (gns *GNSModule) ResolveUnknown( + ctx *service.SessionContext, + name string, + labels []string, + pkey *ed25519.PublicKey, + kind RRTypeList, + depth int) (set *message.GNSRecordSet, err error) { + // relative GNS-based server name? if strings.HasSuffix(name, ".+") { // resolve server name relative to current zone @@ -308,14 +350,14 @@ func (gns *GNSModule) ResolveUnknown(name string, labels []string, pkey *ed25519 for _, label := range util.ReverseStringList(labels) { name += "." + label } - if set, err = gns.Resolve(name, pkey, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil { + if set, err = gns.Resolve(ctx, name, pkey, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil { return } } else { // check for absolute GNS name (with PKEY as TLD) if zk := gns.GetZoneKey(name); zk != nil { // resolve absolute GNS name (name ends in a PKEY) - if set, err = gns.Resolve(util.StripPathRight(name), zk, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil { + if set, err = gns.Resolve(ctx, util.StripPathRight(name), zk, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil { return } } else { @@ -342,13 +384,17 @@ func (gns *GNSModule) GetZoneKey(path string) *ed25519.PublicKey { } // Lookup name in GNS. -func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, label string, mode int) (block *message.GNSBlock, err error) { +func (gns *GNSModule) Lookup( + ctx *service.SessionContext, + pkey *ed25519.PublicKey, + label string, + mode int) (block *message.GNSBlock, err error) { // create query (lookup key) query := NewQuery(pkey, label) // try local lookup first - if block, err = gns.LookupLocal(query); err != nil { + if block, err = gns.LookupLocal(ctx, query); err != nil { logger.Printf(logger.ERROR, "[gns] local Lookup: %s\n", err.Error()) block = nil return @@ -356,9 +402,17 @@ func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, label string, mode int) (b if block == nil { if mode == enums.GNS_LO_DEFAULT { // get the block from a remote lookup - if block, err = gns.LookupRemote(query); err != nil || block == nil { + if block, err = gns.LookupRemote(ctx, query); err != nil || block == nil { if err != nil { - logger.Printf(logger.ERROR, "[gns] remote Lookup: %s\n", err.Error()) + // check for aborted remote lookup: we need to cancel the query + if err == transport.ErrChannelInterrupted { + logger.Println(logger.WARN, "[gns] remote Lookup aborted -- cleaning up.") + if err = gns.CancelRemote(ctx, query); err != nil { + logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", err.Error()) + } + } else { + logger.Printf(logger.ERROR, "[gns] remote Lookup failed: %s\n", err.Error()) + } block = nil } else { logger.Println(logger.DBG, "[gns] remote Lookup: no block found") @@ -367,8 +421,21 @@ func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, label string, mode int) (b return } // store RRs from remote locally. - gns.StoreLocal(block) + gns.StoreLocal(ctx, block) } } return } + +// newLEHORecord creates a new supplemental GNS record of type LEHO. +func (gns *GNSModule) newLEHORecord(name string, expires util.AbsoluteTime) *message.GNSResourceRecord { + rr := new(message.GNSResourceRecord) + rr.Expires = expires + rr.Flags = uint32(enums.GNS_FLAG_SUPPL) + rr.Type = uint32(enums.GNS_TYPE_LEHO) + rr.Size = uint32(len(name) + 1) + rr.Data = make([]byte, rr.Size) + copy(rr.Data, []byte(name)) + rr.Data[len(name)] = 0 + return rr +} diff --git a/src/gnunet/service/gns/service.go b/src/gnunet/service/gns/service.go @@ -22,7 +22,6 @@ import ( "encoding/hex" "fmt" "io" - "sync" "gnunet/config" "gnunet/crypto" @@ -60,6 +59,7 @@ func NewGNSService() service.Service { inst.LookupLocal = inst.LookupNamecache inst.StoreLocal = inst.StoreNamecache inst.LookupRemote = inst.LookupDHT + inst.CancelRemote = inst.CancelDHT return inst } @@ -74,15 +74,18 @@ func (s *GNSService) Stop() error { } // Serve a client channel. -func (s *GNSService) ServeClient(wg *sync.WaitGroup, mc *transport.MsgChannel) { - defer wg.Done() +func (s *GNSService) ServeClient(ctx *service.SessionContext, mc *transport.MsgChannel) { + loop: for { // receive next message from client - msg, err := mc.Receive() + logger.Printf(logger.DBG, "[gns] Waiting for message in session '%d'...\n", ctx.Id) + msg, err := mc.Receive(ctx.Signaller()) if err != nil { if err == io.EOF { logger.Println(logger.INFO, "[gns] Client channel closed.") + } else if err == transport.ErrChannelInterrupted { + logger.Println(logger.INFO, "[gns] Service operation interrupted.") } else { logger.Printf(logger.ERROR, "[gns] Message-receive failed: %s\n", err.Error()) } @@ -102,23 +105,28 @@ loop: resp = respX // perform lookup on block (locally and remote) - wg.Add(1) go func() { + ctx.Add() defer func() { // send response - if err := mc.Send(resp); err != nil { - logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error()) + if resp != nil { + if err := mc.Send(resp, ctx.Signaller()); err != nil { + logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error()) + } } // go-routine finished - wg.Done() + ctx.Remove() }() pkey := ed25519.NewPublicKeyFromBytes(m.Zone) label := m.GetName() kind := NewRRTypeList(int(m.Type)) - recset, err := s.Resolve(label, pkey, kind, int(m.Options), 0) + recset, err := s.Resolve(ctx, label, pkey, kind, int(m.Options), 0) if err != nil { logger.Printf(logger.ERROR, "[gns] Failed to lookup block: %s\n", err.Error()) + if err == transport.ErrChannelInterrupted { + resp = nil + } return } // handle records @@ -151,12 +159,16 @@ loop: break loop } } + // cancel all tasks running for this session/connection + logger.Printf(logger.INFO, "[gns] Start closing session '%d'...\n", ctx.Id) + ctx.Cancel() + // close client connection mc.Close() } // LookupNamecache -func (s *GNSService) LookupNamecache(query *Query) (block *message.GNSBlock, err error) { +func (s *GNSService) LookupNamecache(ctx *service.SessionContext, query *Query) (block *message.GNSBlock, err error) { logger.Printf(logger.DBG, "[gns] LookupNamecache(%s)...\n", hex.EncodeToString(query.Key.Bits)) // assemble Namecache request @@ -166,7 +178,7 @@ func (s *GNSService) LookupNamecache(query *Query) (block *message.GNSBlock, err // get response from Namecache service var resp message.Message - if resp, err = service.ServiceRequestResponse("gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil { + if resp, err = service.ServiceRequestResponse(ctx, "gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil { return } @@ -219,7 +231,7 @@ func (s *GNSService) LookupNamecache(query *Query) (block *message.GNSBlock, err } // StoreNamecache -func (s *GNSService) StoreNamecache(block *message.GNSBlock) (err error) { +func (s *GNSService) StoreNamecache(ctx *service.SessionContext, block *message.GNSBlock) (err error) { logger.Println(logger.DBG, "[gns] StoreNamecache()...") // assemble Namecache request @@ -228,7 +240,7 @@ func (s *GNSService) StoreNamecache(block *message.GNSBlock) (err error) { // get response from Namecache service var resp message.Message - if resp, err = service.ServiceRequestResponse("gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil { + if resp, err = service.ServiceRequestResponse(ctx, "gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil { return } @@ -255,7 +267,7 @@ func (s *GNSService) StoreNamecache(block *message.GNSBlock) (err error) { } // LookupDHT -func (s *GNSService) LookupDHT(query *Query) (block *message.GNSBlock, err error) { +func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) (block *message.GNSBlock, err error) { logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) // assemble DHT request @@ -268,7 +280,7 @@ func (s *GNSService) LookupDHT(query *Query) (block *message.GNSBlock, err error // get response from DHT service var resp message.Message - if resp, err = service.ServiceRequestResponse("gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil { + if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil { return } @@ -313,9 +325,36 @@ func (s *GNSService) LookupDHT(query *Query) (block *message.GNSBlock, err error // we got a result from DHT that was not in the namecache, // so store it there now. - if err = s.StoreNamecache(block); err != nil { + if err = s.StoreNamecache(ctx, block); err != nil { logger.Printf(logger.ERROR, "[gns] can't store block in Namecache: %s\n", err.Error()) } } return } + +// CancelDHT +func (s *GNSService) CancelDHT(ctx *service.SessionContext, query *Query) (err error) { + logger.Printf(logger.DBG, "[gns] CancelDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) + + // assemble DHT request + req := message.NewDHTClientGetStopMsg(query.Key) + req.Id = uint64(util.NextID()) + + // get response from DHT service + var resp message.Message + if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil { + return + } + + // handle message depending on its type + logger.Println(logger.DBG, "[gns] Handling response from DHT service") + switch m := resp.(type) { + case *message.DHTClientResultMsg: + // check for matching IDs + if m.Id != req.Id { + logger.Println(logger.ERROR, "[gns] Got response for unknown ID") + break + } + } + return +} diff --git a/src/gnunet/service/namecache/module.go b/src/gnunet/service/namecache/module.go @@ -20,6 +20,7 @@ package namecache import ( "gnunet/message" + "gnunet/service" "gnunet/service/gns" ) @@ -35,10 +36,10 @@ import ( type NamecacheModule struct { } -func (nc *NamecacheModule) Get(query *gns.Query) (*message.GNSBlock, error) { +func (nc *NamecacheModule) Get(ctx *service.SessionContext, query *gns.Query) (*message.GNSBlock, error) { return nil, nil } -func (nc *NamecacheModule) Put(block *message.GNSBlock) error { +func (nc *NamecacheModule) Put(ctx *service.SessionContext, block *message.GNSBlock) error { return nil } diff --git a/src/gnunet/service/service.go b/src/gnunet/service/service.go @@ -33,19 +33,21 @@ import ( // Channel semantics in the specification string. type Service interface { Start(spec string) error - ServeClient(wg *sync.WaitGroup, ch *transport.MsgChannel) + ServeClient(ctx *SessionContext, ch *transport.MsgChannel) Stop() error } // ServiceImpl is an implementation of generic service functionality. type ServiceImpl struct { - impl Service - hdlr chan transport.Channel - ctrl chan bool - srvc transport.ChannelServer - wg *sync.WaitGroup - name string - running bool + impl Service // Specific service implementation + hdlr chan transport.Channel // Channel from listener + ctrl chan bool // Control channel + drop chan int // Channel to drop a session from pending list + srvc transport.ChannelServer // multi-user service + wg *sync.WaitGroup // wait group for go routine synchronization + name string // service name + running bool // service currently running? + pending map[int]*SessionContext // list of pending sessions } // NewServiceImpl instantiates a new ServiceImpl object. @@ -54,10 +56,12 @@ func NewServiceImpl(name string, srv Service) *ServiceImpl { impl: srv, hdlr: make(chan transport.Channel), ctrl: make(chan bool), + drop: make(chan int), srvc: nil, wg: new(sync.WaitGroup), name: name, running: false, + pending: make(map[int]*SessionContext), } } @@ -83,6 +87,8 @@ func (si *ServiceImpl) Start(spec string) (err error) { loop: for si.running { select { + + // handle incoming connections case in := <-si.hdlr: if in == nil { logger.Printf(logger.INFO, "[%s] Listener terminated.\n", si.name) @@ -90,14 +96,38 @@ func (si *ServiceImpl) Start(spec string) (err error) { } switch ch := in.(type) { case transport.Channel: - logger.Printf(logger.INFO, "[%s] Client connected.\n", si.name) - si.wg.Add(1) - go si.impl.ServeClient(si.wg, transport.NewMsgChannel(ch)) + // run a new session with context + ctx := NewSessionContext() + sessId := ctx.Id + si.pending[sessId] = ctx + logger.Printf(logger.INFO, "[%s] Session '%d' started.\n", si.name, sessId) + + go func() { + // serve client on the message channel + si.impl.ServeClient(ctx, transport.NewMsgChannel(ch)) + // session is done now. + logger.Printf(logger.INFO, "[%s] Session with client '%d' ended.\n", si.name, sessId) + si.drop <- sessId + }() } + + // handle session removal + case sessId := <-si.drop: + delete(si.pending, sessId) + + // handle cancelation signal on listener. case <-si.ctrl: break loop } } + + // terminate pending sessions + for _, ctx := range si.pending { + logger.Printf(logger.DBG, "[%s] Session '%d' closing...\n", si.name, ctx.Id) + ctx.Cancel() + } + + // close-down service logger.Printf(logger.INFO, "[%s] Service closing.\n", si.name) si.srvc.Close() si.running = false diff --git a/src/gnunet/transport/channel.go b/src/gnunet/transport/channel.go @@ -28,6 +28,7 @@ import ( "gnunet/message" "gnunet/util" + "github.com/bfix/gospel/concurrent" "github.com/bfix/gospel/data" "github.com/bfix/gospel/logger" ) @@ -36,6 +37,8 @@ import ( var ( ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented") ErrChannelNotOpened = fmt.Errorf("Channel not opened") + ErrChannelInterrupted = fmt.Errorf("Channel interrupted") + ErrChannelClosed = fmt.Errorf("Channel closed") ) //////////////////////////////////////////////////////////////////////// @@ -49,10 +52,11 @@ var ( // "tcp+1.2.3.4:5" -- for TCP channels // "udp+1.2.3.4:5" -- for UDP channels type Channel interface { - Open(spec string) error - Close() error - Read([]byte) (int, error) - Write([]byte) (int, error) + Open(spec string) error // open channel (for read/write) + Close() error // close open channel + IsOpen() bool // check if channel is open + Read([]byte, *concurrent.Signaller) (int, error) // read from channel + Write([]byte, *concurrent.Signaller) (int, error) // write to channel } // ChannelFactory instantiates specific Channel imülementations. @@ -140,7 +144,12 @@ func (c *MsgChannel) Close() error { } // Send a GNUnet message over a channel. -func (c *MsgChannel) Send(msg message.Message) error { +func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) error { + + // check for closed channel + if !c.ch.IsOpen() { + return ErrChannelClosed + } // convert message to binary data data, err := data.Marshal(msg) @@ -160,7 +169,7 @@ func (c *MsgChannel) Send(msg message.Message) error { } // send packet - n, err := c.ch.Write(data) + n, err := c.ch.Write(data, sig) if err != nil { return err } @@ -171,9 +180,15 @@ func (c *MsgChannel) Send(msg message.Message) error { } // Receive GNUnet messages over a plain Channel. -func (c *MsgChannel) Receive() (message.Message, error) { +func (c *MsgChannel) Receive(sig *concurrent.Signaller) (message.Message, error) { + // check for closed channel + if !c.ch.IsOpen() { + return nil, ErrChannelClosed + } + + // get bytes from channel get := func(pos, count int) error { - n, err := c.ch.Read(c.buf[pos : pos+count]) + n, err := c.ch.Read(c.buf[pos:pos+count], sig) if err != nil { return err } @@ -182,6 +197,7 @@ func (c *MsgChannel) Receive() (message.Message, error) { } return nil } + if err := get(0, 4); err != nil { return nil, err } diff --git a/src/gnunet/transport/channel_netw.go b/src/gnunet/transport/channel_netw.go @@ -24,9 +24,30 @@ import ( "strconv" "strings" + "github.com/bfix/gospel/concurrent" "github.com/bfix/gospel/logger" ) +// ChannelResult for read/write operations on channels. +type ChannelResult struct { + count int // number of bytes read/written + err error // error (or nil) +} + +// NewChannelResult instanciates a new object with given attributes. +func NewChannelResult(n int, err error) *ChannelResult { + return &ChannelResult{ + count: n, + err: err, + } +} + +// Values() returns the attributes of a result instance (for passing up the +// call stack). +func (cr *ChannelResult) Values() (int, error) { + return cr.count, cr.err +} + //////////////////////////////////////////////////////////////////////// // Generic network-based Channel @@ -64,27 +85,86 @@ func (c *NetworkChannel) Open(spec string) (err error) { // Close a network channel func (c *NetworkChannel) Close() error { if c.conn != nil { - return c.conn.Close() + rc := c.conn.Close() + c.conn = nil + return rc } return ErrChannelNotOpened } +// IsOpen returns true if the channel is opened +func (c *NetworkChannel) IsOpen() bool { + return c.conn != nil +} + // Read bytes from a network channel into buffer: Returns the number of read // bytes and an error code. Only works on open channels ;) -func (c *NetworkChannel) Read(buf []byte) (int, error) { +// The read can be aborted by sending 'true' on the cmd interface; the +// channel is closed after such interruption. +func (c *NetworkChannel) Read(buf []byte, sig *concurrent.Signaller) (int, error) { + // check if the channel is open if c.conn == nil { return 0, ErrChannelNotOpened } - return c.conn.Read(buf) + // perform operation in go-routine + result := make(chan *ChannelResult) + go func() { + result <- NewChannelResult(c.conn.Read(buf)) + }() + + listener := sig.Listen() + defer sig.Drop(listener) + for { + select { + // handle terminate command + case x := <-listener: + switch val := x.(type) { + case bool: + if val { + c.conn.Close() + c.conn = nil + return 0, ErrChannelInterrupted + } + } + // handle result of read operation + case res := <-result: + return res.Values() + } + } } // Write buffer to a network channel: Returns the number of written bytes and -// an error code. -func (c *NetworkChannel) Write(buf []byte) (int, error) { +// an error code. The write operation can be aborted by sending 'true' on the +// command channel; the network channel is closed after such interrupt. +func (c *NetworkChannel) Write(buf []byte, sig *concurrent.Signaller) (int, error) { + // check if we have an open channel to write to. if c.conn == nil { return 0, ErrChannelNotOpened } - return c.conn.Write(buf) + // perform operation in go-routine + result := make(chan *ChannelResult) + go func() { + result <- NewChannelResult(c.conn.Write(buf)) + }() + + listener := sig.Listen() + defer sig.Drop(listener) + for { + select { + // handle terminate command + case x := <-listener: + switch val := x.(type) { + case bool: + if val { + c.conn.Close() + return 0, ErrChannelInterrupted + } + } + // handle result of read operation + case res := <-result: + return res.Values() + } + } } //////////////////////////////////////////////////////////////////////// diff --git a/src/gnunet/transport/connection.go b/src/gnunet/transport/connection.go @@ -21,6 +21,8 @@ package transport import ( "gnunet/core" "gnunet/message" + + "github.com/bfix/gospel/concurrent" ) // Connection for communicating peers @@ -67,11 +69,11 @@ func (c *Connection) Close() error { } // Send a message on the connection -func (c *Connection) Send(msg message.Message) error { - return c.ch.Send(msg) +func (c *Connection) Send(msg message.Message, sig *concurrent.Signaller) error { + return c.ch.Send(msg, sig) } // Receive a message on the connection -func (c *Connection) Receive() (message.Message, error) { - return c.ch.Receive() +func (c *Connection) Receive(sig *concurrent.Signaller) (message.Message, error) { + return c.ch.Receive(sig) } diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go @@ -22,6 +22,7 @@ import ( "strings" ) +// CounterMap type CounterMap map[interface{}]int func (cm CounterMap) Add(i interface{}) int { diff --git a/src/gnunet/util/time.go b/src/gnunet/util/time.go @@ -78,6 +78,25 @@ func (t AbsoluteTime) Expired() bool { return t.Val < uint64(time.Now().Unix()) } +// Compare two times (-1 = (t < t2), 0 = (t == t2), 1 = (t > t2) +func (t AbsoluteTime) Compare(t2 AbsoluteTime) int { + if t.Val == math.MaxUint64 { + if t2.Val == math.MaxUint64 { + return 0 + } + return 1 + } + if t2.Val == math.MaxUint64 { + return -1 + } + if t.Val < t2.Val { + return -1 + } else if t.Val == t2.Val { + return 0 + } + return 1 +} + //---------------------------------------------------------------------- // Relative time //----------------------------------------------------------------------