aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernd Fix <brf@hoi-polloi.org>2020-03-31 13:12:30 +0200
committerBernd Fix <brf@hoi-polloi.org>2020-03-31 13:12:30 +0200
commit2a2a558aa553aca3394513114d13d4888ae822db (patch)
treecf148a1cd08f391958656ac815956118f28786fb
parentbe8e465cd061612133282db80ab1c7b7387ee251 (diff)
downloadgnunet-go-2a2a558aa553aca3394513114d13d4888ae822db.tar.gz
gnunet-go-2a2a558aa553aca3394513114d13d4888ae822db.zip
Handle DHT lookup aborts.
-rw-r--r--src/cmd/gnunet-service-gns-go/main.go2
-rw-r--r--src/cmd/peer_mockup/process.go26
-rw-r--r--src/gnunet/go.mod4
-rw-r--r--src/gnunet/go.sum5
-rw-r--r--src/gnunet/message/factory.go2
-rw-r--r--src/gnunet/message/msg_dht.go37
-rw-r--r--src/gnunet/modules.go11
-rw-r--r--src/gnunet/service/client.go35
-rw-r--r--src/gnunet/service/context.go68
-rw-r--r--src/gnunet/service/dht/module.go49
-rw-r--r--src/gnunet/service/gns/dns.go13
-rw-r--r--src/gnunet/service/gns/module.go109
-rw-r--r--src/gnunet/service/gns/service.go71
-rw-r--r--src/gnunet/service/namecache/module.go5
-rw-r--r--src/gnunet/service/service.go52
-rw-r--r--src/gnunet/transport/channel.go32
-rw-r--r--src/gnunet/transport/channel_netw.go92
-rw-r--r--src/gnunet/transport/connection.go10
-rw-r--r--src/gnunet/util/misc.go1
-rw-r--r--src/gnunet/util/time.go19
20 files changed, 547 insertions, 96 deletions
diff --git a/src/cmd/gnunet-service-gns-go/main.go b/src/cmd/gnunet-service-gns-go/main.go
index 5f4062a..2815eaa 100644
--- a/src/cmd/gnunet-service-gns-go/main.go
+++ b/src/cmd/gnunet-service-gns-go/main.go
@@ -89,6 +89,8 @@ loop:
89 break loop 89 break loop
90 case syscall.SIGHUP: 90 case syscall.SIGHUP:
91 logger.Println(logger.INFO, "[gns] SIGHUP") 91 logger.Println(logger.INFO, "[gns] SIGHUP")
92 case syscall.SIGURG:
93 // TODO: https://github.com/golang/go/issues/37942
92 default: 94 default:
93 logger.Println(logger.INFO, "[gns] Unhandled signal: "+sig.String()) 95 logger.Println(logger.INFO, "[gns] Unhandled signal: "+sig.String())
94 } 96 }
diff --git a/src/cmd/peer_mockup/process.go b/src/cmd/peer_mockup/process.go
index a26177b..510fc48 100644
--- a/src/cmd/peer_mockup/process.go
+++ b/src/cmd/peer_mockup/process.go
@@ -9,6 +9,12 @@ import (
9 "gnunet/message" 9 "gnunet/message"
10 "gnunet/transport" 10 "gnunet/transport"
11 "gnunet/util" 11 "gnunet/util"
12
13 "github.com/bfix/gospel/concurrent"
14)
15
16var (
17 sig = concurrent.NewSignaller()
12) 18)
13 19
14func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { 20func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) {
@@ -20,7 +26,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) {
20 in := make(chan message.Message) 26 in := make(chan message.Message)
21 go func() { 27 go func() {
22 for { 28 for {
23 msg, err := c.Receive() 29 msg, err := c.Receive(sig)
24 if err != nil { 30 if err != nil {
25 fmt.Printf("Receive: %s\n", err.Error()) 31 fmt.Printf("Receive: %s\n", err.Error())
26 return 32 return
@@ -33,7 +39,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) {
33 init := (from == p) 39 init := (from == p)
34 if init { 40 if init {
35 peerid := util.NewPeerID(p.GetID()) 41 peerid := util.NewPeerID(p.GetID())
36 c.Send(message.NewTransportTcpWelcomeMsg(peerid)) 42 c.Send(message.NewTransportTcpWelcomeMsg(peerid), sig)
37 } 43 }
38 44
39 // remember peer addresses (only ONE!) 45 // remember peer addresses (only ONE!)
@@ -53,11 +59,11 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) {
53 case *message.TransportTcpWelcomeMsg: 59 case *message.TransportTcpWelcomeMsg:
54 peerid := util.NewPeerID(p.GetID()) 60 peerid := util.NewPeerID(p.GetID())
55 if init { 61 if init {
56 c.Send(message.NewHelloMsg(peerid)) 62 c.Send(message.NewHelloMsg(peerid), sig)
57 target := util.NewPeerID(t.GetID()) 63 target := util.NewPeerID(t.GetID())
58 c.Send(message.NewTransportPingMsg(target, tAddr)) 64 c.Send(message.NewTransportPingMsg(target, tAddr), sig)
59 } else { 65 } else {
60 c.Send(message.NewTransportTcpWelcomeMsg(peerid)) 66 c.Send(message.NewTransportTcpWelcomeMsg(peerid), sig)
61 } 67 }
62 68
63 case *message.HelloMsg: 69 case *message.HelloMsg:
@@ -67,7 +73,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) {
67 if err := mOut.Sign(p.PrvKey()); err != nil { 73 if err := mOut.Sign(p.PrvKey()); err != nil {
68 return err 74 return err
69 } 75 }
70 c.Send(mOut) 76 c.Send(mOut, sig)
71 77
72 case *message.TransportPongMsg: 78 case *message.TransportPongMsg:
73 rc, err := msg.Verify(t.PubKey()) 79 rc, err := msg.Verify(t.PubKey())
@@ -79,14 +85,14 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) {
79 } 85 }
80 send[message.TRANSPORT_PONG] = true 86 send[message.TRANSPORT_PONG] = true
81 if mOut, ok := pending[message.TRANSPORT_SESSION_SYN]; ok { 87 if mOut, ok := pending[message.TRANSPORT_SESSION_SYN]; ok {
82 c.Send(mOut) 88 c.Send(mOut, sig)
83 } 89 }
84 90
85 case *message.SessionSynMsg: 91 case *message.SessionSynMsg:
86 mOut := message.NewSessionSynAckMsg() 92 mOut := message.NewSessionSynAckMsg()
87 mOut.Timestamp = msg.Timestamp 93 mOut.Timestamp = msg.Timestamp
88 if send[message.TRANSPORT_PONG] { 94 if send[message.TRANSPORT_PONG] {
89 c.Send(mOut) 95 c.Send(mOut, sig)
90 } else { 96 } else {
91 pending[message.TRANSPORT_SESSION_SYN] = mOut 97 pending[message.TRANSPORT_SESSION_SYN] = mOut
92 } 98 }
@@ -97,7 +103,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) {
97 case *message.SessionAckMsg: 103 case *message.SessionAckMsg:
98 104
99 case *message.SessionKeepAliveMsg: 105 case *message.SessionKeepAliveMsg:
100 c.Send(message.NewSessionKeepAliveRespMsg(msg.Nonce)) 106 c.Send(message.NewSessionKeepAliveRespMsg(msg.Nonce), sig)
101 107
102 case *message.EphemeralKeyMsg: 108 case *message.EphemeralKeyMsg:
103 rc, err := msg.Verify(t.PubKey()) 109 rc, err := msg.Verify(t.PubKey())
@@ -108,7 +114,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) {
108 return errors.New("EPHKEY verification failed") 114 return errors.New("EPHKEY verification failed")
109 } 115 }
110 t.SetEphKeyMsg(msg) 116 t.SetEphKeyMsg(msg)
111 c.Send(p.EphKeyMsg()) 117 c.Send(p.EphKeyMsg(), sig)
112 secret := crypto.SharedSecret(p.EphPrvKey(), t.EphKeyMsg().Public()) 118 secret := crypto.SharedSecret(p.EphPrvKey(), t.EphKeyMsg().Public())
113 c.SharedSecret(util.Clone(secret.Bits[:])) 119 c.SharedSecret(util.Clone(secret.Bits[:]))
114 120
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod
index 83720fd..443666f 100644
--- a/src/gnunet/go.mod
+++ b/src/gnunet/go.mod
@@ -3,7 +3,7 @@ module gnunet
3go 1.12 3go 1.12
4 4
5require ( 5require (
6 github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449 6 github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46
7 github.com/miekg/dns v1.1.26 7 github.com/miekg/dns v1.1.26
8 golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 8 golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
9) 9)
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum
index e9ece97..8c956b4 100644
--- a/src/gnunet/go.sum
+++ b/src/gnunet/go.sum
@@ -1,5 +1,7 @@
1github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449 h1:oIq3s14sMh1sq791v9VpR+GJvhVGbvuOWlfTjruRTDQ= 1github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449 h1:oIq3s14sMh1sq791v9VpR+GJvhVGbvuOWlfTjruRTDQ=
2github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449/go.mod h1:RQYETFV9SP+VriIsHVqCntRpSbbRvCBnNTtbUl9NAKA= 2github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449/go.mod h1:RQYETFV9SP+VriIsHVqCntRpSbbRvCBnNTtbUl9NAKA=
3github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46 h1:5aNd1/ISbO1ltgmyUGza7kdaN4fD/Qal6uKZk9goMhw=
4github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46/go.mod h1:RQYETFV9SP+VriIsHVqCntRpSbbRvCBnNTtbUl9NAKA=
3github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU= 5github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU=
4github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= 6github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
5golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= 7golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -7,6 +9,8 @@ golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472 h1:Gv7RPwsi3eZ2Fgewe3CBsu
7golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= 9golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
8golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M= 10golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M=
9golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= 11golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
12golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
13golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
10golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= 14golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
11golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= 15golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
12golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= 16golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
@@ -19,6 +23,7 @@ golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7w
19golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe h1:6fAMxZRR6sl1Uq8U61gxU+kPTs2tR8uOySCbBP7BN/M= 23golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe h1:6fAMxZRR6sl1Uq8U61gxU+kPTs2tR8uOySCbBP7BN/M=
20golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= 24golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
21golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= 25golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
26golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
22golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= 27golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
23golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= 28golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
24golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= 29golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
diff --git a/src/gnunet/message/factory.go b/src/gnunet/message/factory.go
index e6e6e73..33e806b 100644
--- a/src/gnunet/message/factory.go
+++ b/src/gnunet/message/factory.go
@@ -59,6 +59,8 @@ func NewEmptyMessage(msgType uint16) (Message, error) {
59 //------------------------------------------------------------------ 59 //------------------------------------------------------------------
60 case DHT_CLIENT_GET: 60 case DHT_CLIENT_GET:
61 return NewDHTClientGetMsg(nil), nil 61 return NewDHTClientGetMsg(nil), nil
62 case DHT_CLIENT_GET_STOP:
63 return NewDHTClientGetStopMsg(nil), nil
62 case DHT_CLIENT_RESULT: 64 case DHT_CLIENT_RESULT:
63 return NewDHTClientResultMsg(nil), nil 65 return NewDHTClientResultMsg(nil), nil
64 66
diff --git a/src/gnunet/message/msg_dht.go b/src/gnunet/message/msg_dht.go
index 925cb71..62a233f 100644
--- a/src/gnunet/message/msg_dht.go
+++ b/src/gnunet/message/msg_dht.go
@@ -126,3 +126,40 @@ func (m *DHTClientResultMsg) String() string {
126func (msg *DHTClientResultMsg) Header() *MessageHeader { 126func (msg *DHTClientResultMsg) Header() *MessageHeader {
127 return &MessageHeader{msg.MsgSize, msg.MsgType} 127 return &MessageHeader{msg.MsgSize, msg.MsgType}
128} 128}
129
130//----------------------------------------------------------------------
131// DHT_CLIENT_GET_STOP
132//----------------------------------------------------------------------
133
134// DHTClientGetStopMsg
135type DHTClientGetStopMsg struct {
136 MsgSize uint16 `order:"big"` // total size of message
137 MsgType uint16 `order:"big"` // DHT_CLIENT_GET_STOP (144)
138 Reserved uint32 `order:"big"` // Reserved (0)
139 Id uint64 `order:"big"` // Unique ID identifying this request
140 Key *crypto.HashCode // The key to search for
141}
142
143// NewDHTClientGetStopMsg creates a new default DHTClientGetStopMsg object.
144func NewDHTClientGetStopMsg(key *crypto.HashCode) *DHTClientGetStopMsg {
145 if key == nil {
146 key = new(crypto.HashCode)
147 }
148 return &DHTClientGetStopMsg{
149 MsgSize: 80,
150 MsgType: DHT_CLIENT_GET_STOP,
151 Reserved: 0, // mandatory
152 Id: 0,
153 Key: key,
154 }
155}
156
157// String returns a human-readable representation of the message.
158func (m *DHTClientGetStopMsg) String() string {
159 return fmt.Sprintf("DHTClientGetStopMsg{Id:%d,Key=%s}", m.Id, hex.EncodeToString(m.Key.Bits))
160}
161
162// Header returns the message header in a separate instance.
163func (msg *DHTClientGetStopMsg) Header() *MessageHeader {
164 return &MessageHeader{msg.MsgSize, msg.MsgType}
165}
diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go
index bc2993e..b71eb40 100644
--- a/src/gnunet/modules.go
+++ b/src/gnunet/modules.go
@@ -29,6 +29,7 @@
29package gnunet 29package gnunet
30 30
31import ( 31import (
32 "gnunet/service/dht"
32 "gnunet/service/gns" 33 "gnunet/service/gns"
33 "gnunet/service/namecache" 34 "gnunet/service/namecache"
34) 35)
@@ -37,6 +38,7 @@ import (
37type Instances struct { 38type Instances struct {
38 GNS *gns.GNSModule 39 GNS *gns.GNSModule
39 Namecache *namecache.NamecacheModule 40 Namecache *namecache.NamecacheModule
41 DHT *dht.DHTModule
40} 42}
41 43
42// Local reference to instance list 44// Local reference to instance list
@@ -50,9 +52,14 @@ func init() {
50 // Namecache (no calls to other modules) 52 // Namecache (no calls to other modules)
51 Modules.Namecache = new(namecache.NamecacheModule) 53 Modules.Namecache = new(namecache.NamecacheModule)
52 54
55 // DHT (no calls to other modules)
56 Modules.DHT = new(dht.DHTModule)
57
53 // GNS (calls Namecache, DHT and Identity) 58 // GNS (calls Namecache, DHT and Identity)
54 Modules.GNS = &gns.GNSModule{ 59 Modules.GNS = &gns.GNSModule{
55 LookupLocal: Modules.Namecache.Get, 60 LookupLocal: Modules.Namecache.Get,
56 StoreLocal: Modules.Namecache.Put, 61 StoreLocal: Modules.Namecache.Put,
62 LookupRemote: Modules.DHT.Get,
63 CancelRemote: Modules.DHT.Cancel,
57 } 64 }
58} 65}
diff --git a/src/gnunet/service/client.go b/src/gnunet/service/client.go
index 1e54c2d..fe3fa9e 100644
--- a/src/gnunet/service/client.go
+++ b/src/gnunet/service/client.go
@@ -25,36 +25,49 @@ import (
25 "github.com/bfix/gospel/logger" 25 "github.com/bfix/gospel/logger"
26) 26)
27 27
28// Client 28// Client type: Use to perform client-side interactions with GNUnet services.
29type Client struct { 29type Client struct {
30 ch *transport.MsgChannel 30 ch *transport.MsgChannel // channel for message exchange
31} 31}
32 32
33// NewClient 33// NewClient creates a new client instance for the given channel endpoint.
34func NewClient(endp string) (*Client, error) { 34func NewClient(endp string) (*Client, error) {
35 // 35 // create a new channel to endpoint.
36 ch, err := transport.NewChannel(endp) 36 ch, err := transport.NewChannel(endp)
37 if err != nil { 37 if err != nil {
38 return nil, err 38 return nil, err
39 } 39 }
40 // wrap into a message channel for the client.
40 return &Client{ 41 return &Client{
41 ch: transport.NewMsgChannel(ch), 42 ch: transport.NewMsgChannel(ch),
42 }, nil 43 }, nil
43} 44}
44 45
45func (c *Client) SendRequest(req message.Message) error { 46// SendRequest sends a give message to the service.
46 return c.ch.Send(req) 47func (c *Client) SendRequest(ctx *SessionContext, req message.Message) error {
48 return c.ch.Send(req, ctx.Signaller())
47} 49}
48 50
49func (c *Client) ReceiveResponse() (message.Message, error) { 51// ReceiveResponse waits for a response from the service; it can be interrupted
50 return c.ch.Receive() 52// by sending "false" to the cmd channel.
53func (c *Client) ReceiveResponse(ctx *SessionContext) (message.Message, error) {
54 return c.ch.Receive(ctx.Signaller())
51} 55}
52 56
57// Close a client; no further message exchange is possible.
53func (c *Client) Close() error { 58func (c *Client) Close() error {
54 return c.ch.Close() 59 return c.ch.Close()
55} 60}
56 61
57func ServiceRequestResponse(caller, callee, endp string, req message.Message) (message.Message, error) { 62// ServiceRequestResponse is a helper method for a one request - one response
63// secenarios of client/serice interactions.
64func ServiceRequestResponse(
65 ctx *SessionContext,
66 caller string,
67 callee string,
68 endp string,
69 req message.Message) (message.Message, error) {
70
58 // client-connect to the service 71 // client-connect to the service
59 logger.Printf(logger.DBG, "[%s] Connect to %s service\n", caller, callee) 72 logger.Printf(logger.DBG, "[%s] Connect to %s service\n", caller, callee)
60 cl, err := NewClient(endp) 73 cl, err := NewClient(endp)
@@ -63,13 +76,13 @@ func ServiceRequestResponse(caller, callee, endp string, req message.Message) (m
63 } 76 }
64 // send request 77 // send request
65 logger.Printf(logger.DBG, "[%s] Sending request to %s service\n", caller, callee) 78 logger.Printf(logger.DBG, "[%s] Sending request to %s service\n", caller, callee)
66 if err = cl.SendRequest(req); err != nil { 79 if err = cl.SendRequest(ctx, req); err != nil {
67 return nil, err 80 return nil, err
68 } 81 }
69 // wait for a single response, then close the connection 82 // wait for a single response, then close the connection
70 logger.Printf(logger.DBG, "[%s] Waiting for response from %s service\n", caller, callee) 83 logger.Printf(logger.DBG, "[%s] Waiting for response from %s service\n", caller, callee)
71 var resp message.Message 84 var resp message.Message
72 if resp, err = cl.ReceiveResponse(); err != nil { 85 if resp, err = cl.ReceiveResponse(ctx); err != nil {
73 return nil, err 86 return nil, err
74 } 87 }
75 logger.Printf(logger.DBG, "[%s] Closing connection to %s service\n", caller, callee) 88 logger.Printf(logger.DBG, "[%s] Closing connection to %s service\n", caller, callee)
diff --git a/src/gnunet/service/context.go b/src/gnunet/service/context.go
new file mode 100644
index 0000000..4896bd5
--- /dev/null
+++ b/src/gnunet/service/context.go
@@ -0,0 +1,68 @@
1// This file is part of gnunet-go, a GNUnet-implementation in Golang.
2// Copyright (C) 2019, 2020 Bernd Fix >Y<
3//
4// gnunet-go is free software: you can redistribute it and/or modify it
5// under the terms of the GNU Affero General Public License as published
6// by the Free Software Foundation, either version 3 of the License,
7// or (at your option) any later version.
8//
9// gnunet-go is distributed in the hope that it will be useful, but
10// WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12// Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program. If not, see <http://www.gnu.org/licenses/>.
16//
17// SPDX-License-Identifier: AGPL3.0-or-later
18
19package service
20
21import (
22 "sync"
23
24 "gnunet/util"
25
26 "github.com/bfix/gospel/concurrent"
27)
28
29// SessionContext is used to set a context for each client connection handled
30// by a service; the session is handled by the 'ServeClient' method of a
31// service implementation.
32type SessionContext struct {
33 Id int // session identifier
34 wg *sync.WaitGroup // wait group for the session
35 sig *concurrent.Signaller // signaller for the session
36}
37
38// NewSessionContext instantiates a new session context.
39func NewSessionContext() *SessionContext {
40 return &SessionContext{
41 Id: util.NextID(),
42 wg: new(sync.WaitGroup),
43 sig: concurrent.NewSignaller(),
44 }
45}
46
47// Cancel all go-routines associated with this context.
48func (ctx *SessionContext) Cancel() {
49 // send signal to terminate...
50 ctx.sig.Send(true)
51 // wait for session go-routines to finish
52 ctx.wg.Wait()
53}
54
55// Add a go-routine to the wait group.
56func (ctx *SessionContext) Add() {
57 ctx.wg.Add(1)
58}
59
60// Remove a go-routine from the wait group.
61func (ctx *SessionContext) Remove() {
62 ctx.wg.Done()
63}
64
65// Signaller returns the working instance for the context.
66func (ctx *SessionContext) Signaller() *concurrent.Signaller {
67 return ctx.sig
68}
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
new file mode 100644
index 0000000..a54d0eb
--- /dev/null
+++ b/src/gnunet/service/dht/module.go
@@ -0,0 +1,49 @@
1// This file is part of gnunet-go, a GNUnet-implementation in Golang.
2// Copyright (C) 2019, 2020 Bernd Fix >Y<
3//
4// gnunet-go is free software: you can redistribute it and/or modify it
5// under the terms of the GNU Affero General Public License as published
6// by the Free Software Foundation, either version 3 of the License,
7// or (at your option) any later version.
8//
9// gnunet-go is distributed in the hope that it will be useful, but
10// WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12// Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program. If not, see <http://www.gnu.org/licenses/>.
16//
17// SPDX-License-Identifier: AGPL3.0-or-later
18
19package dht
20
21import (
22 "gnunet/message"
23 "gnunet/service"
24 "gnunet/service/gns"
25)
26
27//======================================================================
28// "DHT" implementation
29//======================================================================
30
31//----------------------------------------------------------------------
32// Put and get blocks into/from a DHT.
33//----------------------------------------------------------------------
34
35// DHT handles the permanent storage of blocks under the query key.
36type DHTModule struct {
37}
38
39func (nc *DHTModule) Get(ctx *service.SessionContext, query *gns.Query) (*message.GNSBlock, error) {
40 return nil, nil
41}
42
43func (nc *DHTModule) Cancel(ctx *service.SessionContext, query *gns.Query) error {
44 return nil
45}
46
47func (nc *DHTModule) Put(ctx *service.SessionContext, block *message.GNSBlock) error {
48 return nil
49}
diff --git a/src/gnunet/service/gns/dns.go b/src/gnunet/service/gns/dns.go
index 28e9813..7a9a30c 100644
--- a/src/gnunet/service/gns/dns.go
+++ b/src/gnunet/service/gns/dns.go
@@ -26,6 +26,7 @@ import (
26 26
27 "gnunet/enums" 27 "gnunet/enums"
28 "gnunet/message" 28 "gnunet/message"
29 "gnunet/service"
29 "gnunet/util" 30 "gnunet/util"
30 31
31 "github.com/bfix/gospel/crypto/ed25519" 32 "github.com/bfix/gospel/crypto/ed25519"
@@ -202,10 +203,16 @@ func QueryDNS(id int, name string, server net.IP, kind RRTypeList) *message.GNSR
202// ResolveDNS resolves a name in DNS. Multiple DNS servers are queried in 203// ResolveDNS resolves a name in DNS. Multiple DNS servers are queried in
203// parallel; the first result delivered by any of the servers is returned 204// parallel; the first result delivered by any of the servers is returned
204// as the result list of matching resource records. 205// as the result list of matching resource records.
205func (gns *GNSModule) ResolveDNS(name string, servers []string, kind RRTypeList, pkey *ed25519.PublicKey, depth int) (set *message.GNSRecordSet, err error) { 206func (gns *GNSModule) ResolveDNS(
206 logger.Printf(logger.DBG, "[dns] Resolution of '%s' starting...\n", name) 207 ctx *service.SessionContext,
208 name string,
209 servers []string,
210 kind RRTypeList,
211 pkey *ed25519.PublicKey,
212 depth int) (set *message.GNSRecordSet, err error) {
207 213
208 // start DNS queries concurrently 214 // start DNS queries concurrently
215 logger.Printf(logger.DBG, "[dns] Resolution of '%s' starting...\n", name)
209 res := make(chan *message.GNSRecordSet) 216 res := make(chan *message.GNSRecordSet)
210 running := 0 217 running := 0
211 for _, srv := range servers { 218 for _, srv := range servers {
@@ -215,7 +222,7 @@ func (gns *GNSModule) ResolveDNS(name string, servers []string, kind RRTypeList,
215 if addr == nil { 222 if addr == nil {
216 // no, it is a name... try to resolve an IP address from the name 223 // no, it is a name... try to resolve an IP address from the name
217 query := NewRRTypeList(enums.GNS_TYPE_DNS_A, enums.GNS_TYPE_DNS_AAAA) 224 query := NewRRTypeList(enums.GNS_TYPE_DNS_A, enums.GNS_TYPE_DNS_AAAA)
218 if set, err = gns.ResolveUnknown(srv, nil, pkey, query, depth+1); err != nil { 225 if set, err = gns.ResolveUnknown(ctx, srv, nil, pkey, query, depth+1); err != nil {
219 logger.Printf(logger.ERROR, "[dns] Can't resolve NS server '%s': %s\n", srv, err.Error()) 226 logger.Printf(logger.ERROR, "[dns] Can't resolve NS server '%s': %s\n", srv, err.Error())
220 continue 227 continue
221 } 228 }
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index d067633..43f7b7a 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -26,6 +26,8 @@ import (
26 "gnunet/crypto" 26 "gnunet/crypto"
27 "gnunet/enums" 27 "gnunet/enums"
28 "gnunet/message" 28 "gnunet/message"
29 "gnunet/service"
30 "gnunet/transport"
29 "gnunet/util" 31 "gnunet/util"
30 32
31 "github.com/bfix/gospel/crypto/ed25519" 33 "github.com/bfix/gospel/crypto/ed25519"
@@ -110,15 +112,22 @@ func NewQuery(pkey *ed25519.PublicKey, label string) *Query {
110// GNSModule handles the resolution of GNS names to RRs bundled in a block. 112// GNSModule handles the resolution of GNS names to RRs bundled in a block.
111type GNSModule struct { 113type GNSModule struct {
112 // Use function references for calls to methods in other modules: 114 // Use function references for calls to methods in other modules:
113 // 115 LookupLocal func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error)
114 LookupLocal func(query *Query) (*message.GNSBlock, error) 116 StoreLocal func(ctx *service.SessionContext, block *message.GNSBlock) error
115 StoreLocal func(block *message.GNSBlock) error 117 LookupRemote func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error)
116 LookupRemote func(query *Query) (*message.GNSBlock, error) 118 CancelRemote func(ctx *service.SessionContext, query *Query) error
117} 119}
118 120
119// Resolve a GNS name with multiple labels. If pkey is not nil, the name 121// Resolve a GNS name with multiple labels. If pkey is not nil, the name
120// is interpreted as "relative to current zone". 122// is interpreted as "relative to current zone".
121func (gns *GNSModule) Resolve(path string, pkey *ed25519.PublicKey, kind RRTypeList, mode int, depth int) (set *message.GNSRecordSet, err error) { 123func (gns *GNSModule) Resolve(
124 ctx *service.SessionContext,
125 path string,
126 pkey *ed25519.PublicKey,
127 kind RRTypeList,
128 mode int,
129 depth int) (set *message.GNSRecordSet, err error) {
130
122 // check for recursion depth 131 // check for recursion depth
123 if depth > config.Cfg.GNS.MaxDepth { 132 if depth > config.Cfg.GNS.MaxDepth {
124 return nil, ErrGNSRecursionExceeded 133 return nil, ErrGNSRecursionExceeded
@@ -130,14 +139,20 @@ func (gns *GNSModule) Resolve(path string, pkey *ed25519.PublicKey, kind RRTypeL
130 // check for relative path 139 // check for relative path
131 if pkey != nil { 140 if pkey != nil {
132 //resolve relative path 141 //resolve relative path
133 return gns.ResolveRelative(names, pkey, kind, mode, depth) 142 return gns.ResolveRelative(ctx, names, pkey, kind, mode, depth)
134 } 143 }
135 // resolve absolute path 144 // resolve absolute path
136 return gns.ResolveAbsolute(names, kind, mode, depth) 145 return gns.ResolveAbsolute(ctx, names, kind, mode, depth)
137} 146}
138 147
139// Resolve a fully qualified GNS absolute name (with multiple labels). 148// Resolve a fully qualified GNS absolute name (with multiple labels).
140func (gns *GNSModule) ResolveAbsolute(labels []string, kind RRTypeList, mode int, depth int) (set *message.GNSRecordSet, err error) { 149func (gns *GNSModule) ResolveAbsolute(
150 ctx *service.SessionContext,
151 labels []string,
152 kind RRTypeList,
153 mode int,
154 depth int) (set *message.GNSRecordSet, err error) {
155
141 // get the zone key for the TLD 156 // get the zone key for the TLD
142 pkey := gns.GetZoneKey(labels[0]) 157 pkey := gns.GetZoneKey(labels[0])
143 if pkey == nil { 158 if pkey == nil {
@@ -146,12 +161,19 @@ func (gns *GNSModule) ResolveAbsolute(labels []string, kind RRTypeList, mode int
146 return 161 return
147 } 162 }
148 // continue with resolution relative to a zone. 163 // continue with resolution relative to a zone.
149 return gns.ResolveRelative(labels[1:], pkey, kind, mode, depth) 164 return gns.ResolveRelative(ctx, labels[1:], pkey, kind, mode, depth)
150} 165}
151 166
152// Resolve relative path (to a given zone) recursively by processing simple 167// Resolve relative path (to a given zone) recursively by processing simple
153// (PKEY,Label) lookups in sequence and handle intermediate GNS record types 168// (PKEY,Label) lookups in sequence and handle intermediate GNS record types
154func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey, kind RRTypeList, mode int, depth int) (set *message.GNSRecordSet, err error) { 169func (gns *GNSModule) ResolveRelative(
170 ctx *service.SessionContext,
171 labels []string,
172 pkey *ed25519.PublicKey,
173 kind RRTypeList,
174 mode int,
175 depth int) (set *message.GNSRecordSet, err error) {
176
155 // Process all names in sequence 177 // Process all names in sequence
156 var ( 178 var (
157 records []*message.GNSResourceRecord // final resource records from resolution 179 records []*message.GNSResourceRecord // final resource records from resolution
@@ -162,7 +184,7 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey,
162 184
163 // resolve next level 185 // resolve next level
164 var block *message.GNSBlock 186 var block *message.GNSBlock
165 if block, err = gns.Lookup(pkey, labels[0], mode); err != nil { 187 if block, err = gns.Lookup(ctx, pkey, labels[0], mode); err != nil {
166 // failed to resolve name 188 // failed to resolve name
167 return 189 return
168 } 190 }
@@ -225,10 +247,23 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey,
225 lbls += "." 247 lbls += "."
226 } 248 }
227 fqdn := lbls + inst.Query 249 fqdn := lbls + inst.Query
228 if set, err = gns.ResolveDNS(fqdn, inst.Servers, kind, pkey, depth); err != nil { 250 if set, err = gns.ResolveDNS(ctx, fqdn, inst.Servers, kind, pkey, depth); err != nil {
229 logger.Println(logger.ERROR, "[gns] GNS2DNS resolution failed.") 251 logger.Println(logger.ERROR, "[gns] GNS2DNS resolution failed.")
230 return 252 return
231 } 253 }
254 // add synthetic LEHO record if we have results and are at the
255 // end of the name (labels).
256 if len(set.Records) > 0 && len(labels) == 1 {
257 // add LEHO supplemental record: The TTL of the new record is
258 // the longest-living record in the current set.
259 expires := util.AbsoluteTimeNow()
260 for _, rec := range set.Records {
261 if rec.Expires.Compare(expires) > 0 {
262 expires = rec.Expires
263 }
264 }
265 set.Records = append(set.Records, gns.newLEHORecord(inst.Query, expires))
266 }
232 // we are done with resolution; pass on records to caller 267 // we are done with resolution; pass on records to caller
233 records = set.Records 268 records = set.Records
234 break 269 break
@@ -250,7 +285,7 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey,
250 break 285 break
251 } 286 }
252 logger.Println(logger.DBG, "[gns] CNAME resolution required.") 287 logger.Println(logger.DBG, "[gns] CNAME resolution required.")
253 if set, err = gns.ResolveUnknown(inst.name, labels, pkey, kind, depth+1); err != nil { 288 if set, err = gns.ResolveUnknown(ctx, inst.name, labels, pkey, kind, depth+1); err != nil {
254 logger.Println(logger.ERROR, "[gns] CNAME resolution failed.") 289 logger.Println(logger.ERROR, "[gns] CNAME resolution failed.")
255 return 290 return
256 } 291 }
@@ -300,7 +335,14 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey,
300// relative to the zone PKEY. If the name is an absolute GNS name (ending in 335// relative to the zone PKEY. If the name is an absolute GNS name (ending in
301// a PKEY TLD), it is also resolved with GNS. All other names are resolved 336// a PKEY TLD), it is also resolved with GNS. All other names are resolved
302// via DNS queries. 337// via DNS queries.
303func (gns *GNSModule) ResolveUnknown(name string, labels []string, pkey *ed25519.PublicKey, kind RRTypeList, depth int) (set *message.GNSRecordSet, err error) { 338func (gns *GNSModule) ResolveUnknown(
339 ctx *service.SessionContext,
340 name string,
341 labels []string,
342 pkey *ed25519.PublicKey,
343 kind RRTypeList,
344 depth int) (set *message.GNSRecordSet, err error) {
345
304 // relative GNS-based server name? 346 // relative GNS-based server name?
305 if strings.HasSuffix(name, ".+") { 347 if strings.HasSuffix(name, ".+") {
306 // resolve server name relative to current zone 348 // resolve server name relative to current zone
@@ -308,14 +350,14 @@ func (gns *GNSModule) ResolveUnknown(name string, labels []string, pkey *ed25519
308 for _, label := range util.ReverseStringList(labels) { 350 for _, label := range util.ReverseStringList(labels) {
309 name += "." + label 351 name += "." + label
310 } 352 }
311 if set, err = gns.Resolve(name, pkey, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil { 353 if set, err = gns.Resolve(ctx, name, pkey, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil {
312 return 354 return
313 } 355 }
314 } else { 356 } else {
315 // check for absolute GNS name (with PKEY as TLD) 357 // check for absolute GNS name (with PKEY as TLD)
316 if zk := gns.GetZoneKey(name); zk != nil { 358 if zk := gns.GetZoneKey(name); zk != nil {
317 // resolve absolute GNS name (name ends in a PKEY) 359 // resolve absolute GNS name (name ends in a PKEY)
318 if set, err = gns.Resolve(util.StripPathRight(name), zk, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil { 360 if set, err = gns.Resolve(ctx, util.StripPathRight(name), zk, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil {
319 return 361 return
320 } 362 }
321 } else { 363 } else {
@@ -342,13 +384,17 @@ func (gns *GNSModule) GetZoneKey(path string) *ed25519.PublicKey {
342} 384}
343 385
344// Lookup name in GNS. 386// Lookup name in GNS.
345func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, label string, mode int) (block *message.GNSBlock, err error) { 387func (gns *GNSModule) Lookup(
388 ctx *service.SessionContext,
389 pkey *ed25519.PublicKey,
390 label string,
391 mode int) (block *message.GNSBlock, err error) {
346 392
347 // create query (lookup key) 393 // create query (lookup key)
348 query := NewQuery(pkey, label) 394 query := NewQuery(pkey, label)
349 395
350 // try local lookup first 396 // try local lookup first
351 if block, err = gns.LookupLocal(query); err != nil { 397 if block, err = gns.LookupLocal(ctx, query); err != nil {
352 logger.Printf(logger.ERROR, "[gns] local Lookup: %s\n", err.Error()) 398 logger.Printf(logger.ERROR, "[gns] local Lookup: %s\n", err.Error())
353 block = nil 399 block = nil
354 return 400 return
@@ -356,9 +402,17 @@ func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, label string, mode int) (b
356 if block == nil { 402 if block == nil {
357 if mode == enums.GNS_LO_DEFAULT { 403 if mode == enums.GNS_LO_DEFAULT {
358 // get the block from a remote lookup 404 // get the block from a remote lookup
359 if block, err = gns.LookupRemote(query); err != nil || block == nil { 405 if block, err = gns.LookupRemote(ctx, query); err != nil || block == nil {
360 if err != nil { 406 if err != nil {
361 logger.Printf(logger.ERROR, "[gns] remote Lookup: %s\n", err.Error()) 407 // check for aborted remote lookup: we need to cancel the query
408 if err == transport.ErrChannelInterrupted {
409 logger.Println(logger.WARN, "[gns] remote Lookup aborted -- cleaning up.")
410 if err = gns.CancelRemote(ctx, query); err != nil {
411 logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", err.Error())
412 }
413 } else {
414 logger.Printf(logger.ERROR, "[gns] remote Lookup failed: %s\n", err.Error())
415 }
362 block = nil 416 block = nil
363 } else { 417 } else {
364 logger.Println(logger.DBG, "[gns] remote Lookup: no block found") 418 logger.Println(logger.DBG, "[gns] remote Lookup: no block found")
@@ -367,8 +421,21 @@ func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, label string, mode int) (b
367 return 421 return
368 } 422 }
369 // store RRs from remote locally. 423 // store RRs from remote locally.
370 gns.StoreLocal(block) 424 gns.StoreLocal(ctx, block)
371 } 425 }
372 } 426 }
373 return 427 return
374} 428}
429
430// newLEHORecord creates a new supplemental GNS record of type LEHO.
431func (gns *GNSModule) newLEHORecord(name string, expires util.AbsoluteTime) *message.GNSResourceRecord {
432 rr := new(message.GNSResourceRecord)
433 rr.Expires = expires
434 rr.Flags = uint32(enums.GNS_FLAG_SUPPL)
435 rr.Type = uint32(enums.GNS_TYPE_LEHO)
436 rr.Size = uint32(len(name) + 1)
437 rr.Data = make([]byte, rr.Size)
438 copy(rr.Data, []byte(name))
439 rr.Data[len(name)] = 0
440 return rr
441}
diff --git a/src/gnunet/service/gns/service.go b/src/gnunet/service/gns/service.go
index 1a62e77..2526ae8 100644
--- a/src/gnunet/service/gns/service.go
+++ b/src/gnunet/service/gns/service.go
@@ -22,7 +22,6 @@ import (
22 "encoding/hex" 22 "encoding/hex"
23 "fmt" 23 "fmt"
24 "io" 24 "io"
25 "sync"
26 25
27 "gnunet/config" 26 "gnunet/config"
28 "gnunet/crypto" 27 "gnunet/crypto"
@@ -60,6 +59,7 @@ func NewGNSService() service.Service {
60 inst.LookupLocal = inst.LookupNamecache 59 inst.LookupLocal = inst.LookupNamecache
61 inst.StoreLocal = inst.StoreNamecache 60 inst.StoreLocal = inst.StoreNamecache
62 inst.LookupRemote = inst.LookupDHT 61 inst.LookupRemote = inst.LookupDHT
62 inst.CancelRemote = inst.CancelDHT
63 return inst 63 return inst
64} 64}
65 65
@@ -74,15 +74,18 @@ func (s *GNSService) Stop() error {
74} 74}
75 75
76// Serve a client channel. 76// Serve a client channel.
77func (s *GNSService) ServeClient(wg *sync.WaitGroup, mc *transport.MsgChannel) { 77func (s *GNSService) ServeClient(ctx *service.SessionContext, mc *transport.MsgChannel) {
78 defer wg.Done() 78
79loop: 79loop:
80 for { 80 for {
81 // receive next message from client 81 // receive next message from client
82 msg, err := mc.Receive() 82 logger.Printf(logger.DBG, "[gns] Waiting for message in session '%d'...\n", ctx.Id)
83 msg, err := mc.Receive(ctx.Signaller())
83 if err != nil { 84 if err != nil {
84 if err == io.EOF { 85 if err == io.EOF {
85 logger.Println(logger.INFO, "[gns] Client channel closed.") 86 logger.Println(logger.INFO, "[gns] Client channel closed.")
87 } else if err == transport.ErrChannelInterrupted {
88 logger.Println(logger.INFO, "[gns] Service operation interrupted.")
86 } else { 89 } else {
87 logger.Printf(logger.ERROR, "[gns] Message-receive failed: %s\n", err.Error()) 90 logger.Printf(logger.ERROR, "[gns] Message-receive failed: %s\n", err.Error())
88 } 91 }
@@ -102,23 +105,28 @@ loop:
102 resp = respX 105 resp = respX
103 106
104 // perform lookup on block (locally and remote) 107 // perform lookup on block (locally and remote)
105 wg.Add(1)
106 go func() { 108 go func() {
109 ctx.Add()
107 defer func() { 110 defer func() {
108 // send response 111 // send response
109 if err := mc.Send(resp); err != nil { 112 if resp != nil {
110 logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error()) 113 if err := mc.Send(resp, ctx.Signaller()); err != nil {
114 logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error())
115 }
111 } 116 }
112 // go-routine finished 117 // go-routine finished
113 wg.Done() 118 ctx.Remove()
114 }() 119 }()
115 120
116 pkey := ed25519.NewPublicKeyFromBytes(m.Zone) 121 pkey := ed25519.NewPublicKeyFromBytes(m.Zone)
117 label := m.GetName() 122 label := m.GetName()
118 kind := NewRRTypeList(int(m.Type)) 123 kind := NewRRTypeList(int(m.Type))
119 recset, err := s.Resolve(label, pkey, kind, int(m.Options), 0) 124 recset, err := s.Resolve(ctx, label, pkey, kind, int(m.Options), 0)
120 if err != nil { 125 if err != nil {
121 logger.Printf(logger.ERROR, "[gns] Failed to lookup block: %s\n", err.Error()) 126 logger.Printf(logger.ERROR, "[gns] Failed to lookup block: %s\n", err.Error())
127 if err == transport.ErrChannelInterrupted {
128 resp = nil
129 }
122 return 130 return
123 } 131 }
124 // handle records 132 // handle records
@@ -151,12 +159,16 @@ loop:
151 break loop 159 break loop
152 } 160 }
153 } 161 }
162 // cancel all tasks running for this session/connection
163 logger.Printf(logger.INFO, "[gns] Start closing session '%d'...\n", ctx.Id)
164 ctx.Cancel()
165
154 // close client connection 166 // close client connection
155 mc.Close() 167 mc.Close()
156} 168}
157 169
158// LookupNamecache 170// LookupNamecache
159func (s *GNSService) LookupNamecache(query *Query) (block *message.GNSBlock, err error) { 171func (s *GNSService) LookupNamecache(ctx *service.SessionContext, query *Query) (block *message.GNSBlock, err error) {
160 logger.Printf(logger.DBG, "[gns] LookupNamecache(%s)...\n", hex.EncodeToString(query.Key.Bits)) 172 logger.Printf(logger.DBG, "[gns] LookupNamecache(%s)...\n", hex.EncodeToString(query.Key.Bits))
161 173
162 // assemble Namecache request 174 // assemble Namecache request
@@ -166,7 +178,7 @@ func (s *GNSService) LookupNamecache(query *Query) (block *message.GNSBlock, err
166 178
167 // get response from Namecache service 179 // get response from Namecache service
168 var resp message.Message 180 var resp message.Message
169 if resp, err = service.ServiceRequestResponse("gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil { 181 if resp, err = service.ServiceRequestResponse(ctx, "gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil {
170 return 182 return
171 } 183 }
172 184
@@ -219,7 +231,7 @@ func (s *GNSService) LookupNamecache(query *Query) (block *message.GNSBlock, err
219} 231}
220 232
221// StoreNamecache 233// StoreNamecache
222func (s *GNSService) StoreNamecache(block *message.GNSBlock) (err error) { 234func (s *GNSService) StoreNamecache(ctx *service.SessionContext, block *message.GNSBlock) (err error) {
223 logger.Println(logger.DBG, "[gns] StoreNamecache()...") 235 logger.Println(logger.DBG, "[gns] StoreNamecache()...")
224 236
225 // assemble Namecache request 237 // assemble Namecache request
@@ -228,7 +240,7 @@ func (s *GNSService) StoreNamecache(block *message.GNSBlock) (err error) {
228 240
229 // get response from Namecache service 241 // get response from Namecache service
230 var resp message.Message 242 var resp message.Message
231 if resp, err = service.ServiceRequestResponse("gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil { 243 if resp, err = service.ServiceRequestResponse(ctx, "gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil {
232 return 244 return
233 } 245 }
234 246
@@ -255,7 +267,7 @@ func (s *GNSService) StoreNamecache(block *message.GNSBlock) (err error) {
255} 267}
256 268
257// LookupDHT 269// LookupDHT
258func (s *GNSService) LookupDHT(query *Query) (block *message.GNSBlock, err error) { 270func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) (block *message.GNSBlock, err error) {
259 logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) 271 logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", hex.EncodeToString(query.Key.Bits))
260 272
261 // assemble DHT request 273 // assemble DHT request
@@ -268,7 +280,7 @@ func (s *GNSService) LookupDHT(query *Query) (block *message.GNSBlock, err error
268 280
269 // get response from DHT service 281 // get response from DHT service
270 var resp message.Message 282 var resp message.Message
271 if resp, err = service.ServiceRequestResponse("gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil { 283 if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil {
272 return 284 return
273 } 285 }
274 286
@@ -313,9 +325,36 @@ func (s *GNSService) LookupDHT(query *Query) (block *message.GNSBlock, err error
313 325
314 // we got a result from DHT that was not in the namecache, 326 // we got a result from DHT that was not in the namecache,
315 // so store it there now. 327 // so store it there now.
316 if err = s.StoreNamecache(block); err != nil { 328 if err = s.StoreNamecache(ctx, block); err != nil {
317 logger.Printf(logger.ERROR, "[gns] can't store block in Namecache: %s\n", err.Error()) 329 logger.Printf(logger.ERROR, "[gns] can't store block in Namecache: %s\n", err.Error())
318 } 330 }
319 } 331 }
320 return 332 return
321} 333}
334
335// CancelDHT
336func (s *GNSService) CancelDHT(ctx *service.SessionContext, query *Query) (err error) {
337 logger.Printf(logger.DBG, "[gns] CancelDHT(%s)...\n", hex.EncodeToString(query.Key.Bits))
338
339 // assemble DHT request
340 req := message.NewDHTClientGetStopMsg(query.Key)
341 req.Id = uint64(util.NextID())
342
343 // get response from DHT service
344 var resp message.Message
345 if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil {
346 return
347 }
348
349 // handle message depending on its type
350 logger.Println(logger.DBG, "[gns] Handling response from DHT service")
351 switch m := resp.(type) {
352 case *message.DHTClientResultMsg:
353 // check for matching IDs
354 if m.Id != req.Id {
355 logger.Println(logger.ERROR, "[gns] Got response for unknown ID")
356 break
357 }
358 }
359 return
360}
diff --git a/src/gnunet/service/namecache/module.go b/src/gnunet/service/namecache/module.go
index 6a10625..a205444 100644
--- a/src/gnunet/service/namecache/module.go
+++ b/src/gnunet/service/namecache/module.go
@@ -20,6 +20,7 @@ package namecache
20 20
21import ( 21import (
22 "gnunet/message" 22 "gnunet/message"
23 "gnunet/service"
23 "gnunet/service/gns" 24 "gnunet/service/gns"
24) 25)
25 26
@@ -35,10 +36,10 @@ import (
35type NamecacheModule struct { 36type NamecacheModule struct {
36} 37}
37 38
38func (nc *NamecacheModule) Get(query *gns.Query) (*message.GNSBlock, error) { 39func (nc *NamecacheModule) Get(ctx *service.SessionContext, query *gns.Query) (*message.GNSBlock, error) {
39 return nil, nil 40 return nil, nil
40} 41}
41 42
42func (nc *NamecacheModule) Put(block *message.GNSBlock) error { 43func (nc *NamecacheModule) Put(ctx *service.SessionContext, block *message.GNSBlock) error {
43 return nil 44 return nil
44} 45}
diff --git a/src/gnunet/service/service.go b/src/gnunet/service/service.go
index d484a95..1017e0b 100644
--- a/src/gnunet/service/service.go
+++ b/src/gnunet/service/service.go
@@ -33,19 +33,21 @@ import (
33// Channel semantics in the specification string. 33// Channel semantics in the specification string.
34type Service interface { 34type Service interface {
35 Start(spec string) error 35 Start(spec string) error
36 ServeClient(wg *sync.WaitGroup, ch *transport.MsgChannel) 36 ServeClient(ctx *SessionContext, ch *transport.MsgChannel)
37 Stop() error 37 Stop() error
38} 38}
39 39
40// ServiceImpl is an implementation of generic service functionality. 40// ServiceImpl is an implementation of generic service functionality.
41type ServiceImpl struct { 41type ServiceImpl struct {
42 impl Service 42 impl Service // Specific service implementation
43 hdlr chan transport.Channel 43 hdlr chan transport.Channel // Channel from listener
44 ctrl chan bool 44 ctrl chan bool // Control channel
45 srvc transport.ChannelServer 45 drop chan int // Channel to drop a session from pending list
46 wg *sync.WaitGroup 46 srvc transport.ChannelServer // multi-user service
47 name string 47 wg *sync.WaitGroup // wait group for go routine synchronization
48 running bool 48 name string // service name
49 running bool // service currently running?
50 pending map[int]*SessionContext // list of pending sessions
49} 51}
50 52
51// NewServiceImpl instantiates a new ServiceImpl object. 53// NewServiceImpl instantiates a new ServiceImpl object.
@@ -54,10 +56,12 @@ func NewServiceImpl(name string, srv Service) *ServiceImpl {
54 impl: srv, 56 impl: srv,
55 hdlr: make(chan transport.Channel), 57 hdlr: make(chan transport.Channel),
56 ctrl: make(chan bool), 58 ctrl: make(chan bool),
59 drop: make(chan int),
57 srvc: nil, 60 srvc: nil,
58 wg: new(sync.WaitGroup), 61 wg: new(sync.WaitGroup),
59 name: name, 62 name: name,
60 running: false, 63 running: false,
64 pending: make(map[int]*SessionContext),
61 } 65 }
62} 66}
63 67
@@ -83,6 +87,8 @@ func (si *ServiceImpl) Start(spec string) (err error) {
83 loop: 87 loop:
84 for si.running { 88 for si.running {
85 select { 89 select {
90
91 // handle incoming connections
86 case in := <-si.hdlr: 92 case in := <-si.hdlr:
87 if in == nil { 93 if in == nil {
88 logger.Printf(logger.INFO, "[%s] Listener terminated.\n", si.name) 94 logger.Printf(logger.INFO, "[%s] Listener terminated.\n", si.name)
@@ -90,14 +96,38 @@ func (si *ServiceImpl) Start(spec string) (err error) {
90 } 96 }
91 switch ch := in.(type) { 97 switch ch := in.(type) {
92 case transport.Channel: 98 case transport.Channel:
93 logger.Printf(logger.INFO, "[%s] Client connected.\n", si.name) 99 // run a new session with context
94 si.wg.Add(1) 100 ctx := NewSessionContext()
95 go si.impl.ServeClient(si.wg, transport.NewMsgChannel(ch)) 101 sessId := ctx.Id
102 si.pending[sessId] = ctx
103 logger.Printf(logger.INFO, "[%s] Session '%d' started.\n", si.name, sessId)
104
105 go func() {
106 // serve client on the message channel
107 si.impl.ServeClient(ctx, transport.NewMsgChannel(ch))
108 // session is done now.
109 logger.Printf(logger.INFO, "[%s] Session with client '%d' ended.\n", si.name, sessId)
110 si.drop <- sessId
111 }()
96 } 112 }
113
114 // handle session removal
115 case sessId := <-si.drop:
116 delete(si.pending, sessId)
117
118 // handle cancelation signal on listener.
97 case <-si.ctrl: 119 case <-si.ctrl:
98 break loop 120 break loop
99 } 121 }
100 } 122 }
123
124 // terminate pending sessions
125 for _, ctx := range si.pending {
126 logger.Printf(logger.DBG, "[%s] Session '%d' closing...\n", si.name, ctx.Id)
127 ctx.Cancel()
128 }
129
130 // close-down service
101 logger.Printf(logger.INFO, "[%s] Service closing.\n", si.name) 131 logger.Printf(logger.INFO, "[%s] Service closing.\n", si.name)
102 si.srvc.Close() 132 si.srvc.Close()
103 si.running = false 133 si.running = false
diff --git a/src/gnunet/transport/channel.go b/src/gnunet/transport/channel.go
index 092ab8a..4ba49ac 100644
--- a/src/gnunet/transport/channel.go
+++ b/src/gnunet/transport/channel.go
@@ -28,6 +28,7 @@ import (
28 "gnunet/message" 28 "gnunet/message"
29 "gnunet/util" 29 "gnunet/util"
30 30
31 "github.com/bfix/gospel/concurrent"
31 "github.com/bfix/gospel/data" 32 "github.com/bfix/gospel/data"
32 "github.com/bfix/gospel/logger" 33 "github.com/bfix/gospel/logger"
33) 34)
@@ -36,6 +37,8 @@ import (
36var ( 37var (
37 ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented") 38 ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented")
38 ErrChannelNotOpened = fmt.Errorf("Channel not opened") 39 ErrChannelNotOpened = fmt.Errorf("Channel not opened")
40 ErrChannelInterrupted = fmt.Errorf("Channel interrupted")
41 ErrChannelClosed = fmt.Errorf("Channel closed")
39) 42)
40 43
41//////////////////////////////////////////////////////////////////////// 44////////////////////////////////////////////////////////////////////////
@@ -49,10 +52,11 @@ var (
49// "tcp+1.2.3.4:5" -- for TCP channels 52// "tcp+1.2.3.4:5" -- for TCP channels
50// "udp+1.2.3.4:5" -- for UDP channels 53// "udp+1.2.3.4:5" -- for UDP channels
51type Channel interface { 54type Channel interface {
52 Open(spec string) error 55 Open(spec string) error // open channel (for read/write)
53 Close() error 56 Close() error // close open channel
54 Read([]byte) (int, error) 57 IsOpen() bool // check if channel is open
55 Write([]byte) (int, error) 58 Read([]byte, *concurrent.Signaller) (int, error) // read from channel
59 Write([]byte, *concurrent.Signaller) (int, error) // write to channel
56} 60}
57 61
58// ChannelFactory instantiates specific Channel imülementations. 62// ChannelFactory instantiates specific Channel imülementations.
@@ -140,7 +144,12 @@ func (c *MsgChannel) Close() error {
140} 144}
141 145
142// Send a GNUnet message over a channel. 146// Send a GNUnet message over a channel.
143func (c *MsgChannel) Send(msg message.Message) error { 147func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) error {
148
149 // check for closed channel
150 if !c.ch.IsOpen() {
151 return ErrChannelClosed
152 }
144 153
145 // convert message to binary data 154 // convert message to binary data
146 data, err := data.Marshal(msg) 155 data, err := data.Marshal(msg)
@@ -160,7 +169,7 @@ func (c *MsgChannel) Send(msg message.Message) error {
160 } 169 }
161 170
162 // send packet 171 // send packet
163 n, err := c.ch.Write(data) 172 n, err := c.ch.Write(data, sig)
164 if err != nil { 173 if err != nil {
165 return err 174 return err
166 } 175 }
@@ -171,9 +180,15 @@ func (c *MsgChannel) Send(msg message.Message) error {
171} 180}
172 181
173// Receive GNUnet messages over a plain Channel. 182// Receive GNUnet messages over a plain Channel.
174func (c *MsgChannel) Receive() (message.Message, error) { 183func (c *MsgChannel) Receive(sig *concurrent.Signaller) (message.Message, error) {
184 // check for closed channel
185 if !c.ch.IsOpen() {
186 return nil, ErrChannelClosed
187 }
188
189 // get bytes from channel
175 get := func(pos, count int) error { 190 get := func(pos, count int) error {
176 n, err := c.ch.Read(c.buf[pos : pos+count]) 191 n, err := c.ch.Read(c.buf[pos:pos+count], sig)
177 if err != nil { 192 if err != nil {
178 return err 193 return err
179 } 194 }
@@ -182,6 +197,7 @@ func (c *MsgChannel) Receive() (message.Message, error) {
182 } 197 }
183 return nil 198 return nil
184 } 199 }
200
185 if err := get(0, 4); err != nil { 201 if err := get(0, 4); err != nil {
186 return nil, err 202 return nil, err
187 } 203 }
diff --git a/src/gnunet/transport/channel_netw.go b/src/gnunet/transport/channel_netw.go
index c56c089..4f0ba4a 100644
--- a/src/gnunet/transport/channel_netw.go
+++ b/src/gnunet/transport/channel_netw.go
@@ -24,9 +24,30 @@ import (
24 "strconv" 24 "strconv"
25 "strings" 25 "strings"
26 26
27 "github.com/bfix/gospel/concurrent"
27 "github.com/bfix/gospel/logger" 28 "github.com/bfix/gospel/logger"
28) 29)
29 30
31// ChannelResult for read/write operations on channels.
32type ChannelResult struct {
33 count int // number of bytes read/written
34 err error // error (or nil)
35}
36
37// NewChannelResult instanciates a new object with given attributes.
38func NewChannelResult(n int, err error) *ChannelResult {
39 return &ChannelResult{
40 count: n,
41 err: err,
42 }
43}
44
45// Values() returns the attributes of a result instance (for passing up the
46// call stack).
47func (cr *ChannelResult) Values() (int, error) {
48 return cr.count, cr.err
49}
50
30//////////////////////////////////////////////////////////////////////// 51////////////////////////////////////////////////////////////////////////
31// Generic network-based Channel 52// Generic network-based Channel
32 53
@@ -64,27 +85,86 @@ func (c *NetworkChannel) Open(spec string) (err error) {
64// Close a network channel 85// Close a network channel
65func (c *NetworkChannel) Close() error { 86func (c *NetworkChannel) Close() error {
66 if c.conn != nil { 87 if c.conn != nil {
67 return c.conn.Close() 88 rc := c.conn.Close()
89 c.conn = nil
90 return rc
68 } 91 }
69 return ErrChannelNotOpened 92 return ErrChannelNotOpened
70} 93}
71 94
95// IsOpen returns true if the channel is opened
96func (c *NetworkChannel) IsOpen() bool {
97 return c.conn != nil
98}
99
72// Read bytes from a network channel into buffer: Returns the number of read 100// Read bytes from a network channel into buffer: Returns the number of read
73// bytes and an error code. Only works on open channels ;) 101// bytes and an error code. Only works on open channels ;)
74func (c *NetworkChannel) Read(buf []byte) (int, error) { 102// The read can be aborted by sending 'true' on the cmd interface; the
103// channel is closed after such interruption.
104func (c *NetworkChannel) Read(buf []byte, sig *concurrent.Signaller) (int, error) {
105 // check if the channel is open
75 if c.conn == nil { 106 if c.conn == nil {
76 return 0, ErrChannelNotOpened 107 return 0, ErrChannelNotOpened
77 } 108 }
78 return c.conn.Read(buf) 109 // perform operation in go-routine
110 result := make(chan *ChannelResult)
111 go func() {
112 result <- NewChannelResult(c.conn.Read(buf))
113 }()
114
115 listener := sig.Listen()
116 defer sig.Drop(listener)
117 for {
118 select {
119 // handle terminate command
120 case x := <-listener:
121 switch val := x.(type) {
122 case bool:
123 if val {
124 c.conn.Close()
125 c.conn = nil
126 return 0, ErrChannelInterrupted
127 }
128 }
129 // handle result of read operation
130 case res := <-result:
131 return res.Values()
132 }
133 }
79} 134}
80 135
81// Write buffer to a network channel: Returns the number of written bytes and 136// Write buffer to a network channel: Returns the number of written bytes and
82// an error code. 137// an error code. The write operation can be aborted by sending 'true' on the
83func (c *NetworkChannel) Write(buf []byte) (int, error) { 138// command channel; the network channel is closed after such interrupt.
139func (c *NetworkChannel) Write(buf []byte, sig *concurrent.Signaller) (int, error) {
140 // check if we have an open channel to write to.
84 if c.conn == nil { 141 if c.conn == nil {
85 return 0, ErrChannelNotOpened 142 return 0, ErrChannelNotOpened
86 } 143 }
87 return c.conn.Write(buf) 144 // perform operation in go-routine
145 result := make(chan *ChannelResult)
146 go func() {
147 result <- NewChannelResult(c.conn.Write(buf))
148 }()
149
150 listener := sig.Listen()
151 defer sig.Drop(listener)
152 for {
153 select {
154 // handle terminate command
155 case x := <-listener:
156 switch val := x.(type) {
157 case bool:
158 if val {
159 c.conn.Close()
160 return 0, ErrChannelInterrupted
161 }
162 }
163 // handle result of read operation
164 case res := <-result:
165 return res.Values()
166 }
167 }
88} 168}
89 169
90//////////////////////////////////////////////////////////////////////// 170////////////////////////////////////////////////////////////////////////
diff --git a/src/gnunet/transport/connection.go b/src/gnunet/transport/connection.go
index e2cfae2..03549fc 100644
--- a/src/gnunet/transport/connection.go
+++ b/src/gnunet/transport/connection.go
@@ -21,6 +21,8 @@ package transport
21import ( 21import (
22 "gnunet/core" 22 "gnunet/core"
23 "gnunet/message" 23 "gnunet/message"
24
25 "github.com/bfix/gospel/concurrent"
24) 26)
25 27
26// Connection for communicating peers 28// Connection for communicating peers
@@ -67,11 +69,11 @@ func (c *Connection) Close() error {
67} 69}
68 70
69// Send a message on the connection 71// Send a message on the connection
70func (c *Connection) Send(msg message.Message) error { 72func (c *Connection) Send(msg message.Message, sig *concurrent.Signaller) error {
71 return c.ch.Send(msg) 73 return c.ch.Send(msg, sig)
72} 74}
73 75
74// Receive a message on the connection 76// Receive a message on the connection
75func (c *Connection) Receive() (message.Message, error) { 77func (c *Connection) Receive(sig *concurrent.Signaller) (message.Message, error) {
76 return c.ch.Receive() 78 return c.ch.Receive(sig)
77} 79}
diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go
index afad974..80f5c84 100644
--- a/src/gnunet/util/misc.go
+++ b/src/gnunet/util/misc.go
@@ -22,6 +22,7 @@ import (
22 "strings" 22 "strings"
23) 23)
24 24
25// CounterMap
25type CounterMap map[interface{}]int 26type CounterMap map[interface{}]int
26 27
27func (cm CounterMap) Add(i interface{}) int { 28func (cm CounterMap) Add(i interface{}) int {
diff --git a/src/gnunet/util/time.go b/src/gnunet/util/time.go
index 3e09791..e1e0e30 100644
--- a/src/gnunet/util/time.go
+++ b/src/gnunet/util/time.go
@@ -78,6 +78,25 @@ func (t AbsoluteTime) Expired() bool {
78 return t.Val < uint64(time.Now().Unix()) 78 return t.Val < uint64(time.Now().Unix())
79} 79}
80 80
81// Compare two times (-1 = (t < t2), 0 = (t == t2), 1 = (t > t2)
82func (t AbsoluteTime) Compare(t2 AbsoluteTime) int {
83 if t.Val == math.MaxUint64 {
84 if t2.Val == math.MaxUint64 {
85 return 0
86 }
87 return 1
88 }
89 if t2.Val == math.MaxUint64 {
90 return -1
91 }
92 if t.Val < t2.Val {
93 return -1
94 } else if t.Val == t2.Val {
95 return 0
96 }
97 return 1
98}
99
81//---------------------------------------------------------------------- 100//----------------------------------------------------------------------
82// Relative time 101// Relative time
83//---------------------------------------------------------------------- 102//----------------------------------------------------------------------