diff options
author | Bernd Fix <brf@hoi-polloi.org> | 2020-03-31 13:12:30 +0200 |
---|---|---|
committer | Bernd Fix <brf@hoi-polloi.org> | 2020-03-31 13:12:30 +0200 |
commit | 2a2a558aa553aca3394513114d13d4888ae822db (patch) | |
tree | cf148a1cd08f391958656ac815956118f28786fb | |
parent | be8e465cd061612133282db80ab1c7b7387ee251 (diff) | |
download | gnunet-go-2a2a558aa553aca3394513114d13d4888ae822db.tar.gz gnunet-go-2a2a558aa553aca3394513114d13d4888ae822db.zip |
Handle DHT lookup aborts.
-rw-r--r-- | src/cmd/gnunet-service-gns-go/main.go | 2 | ||||
-rw-r--r-- | src/cmd/peer_mockup/process.go | 26 | ||||
-rw-r--r-- | src/gnunet/go.mod | 4 | ||||
-rw-r--r-- | src/gnunet/go.sum | 5 | ||||
-rw-r--r-- | src/gnunet/message/factory.go | 2 | ||||
-rw-r--r-- | src/gnunet/message/msg_dht.go | 37 | ||||
-rw-r--r-- | src/gnunet/modules.go | 11 | ||||
-rw-r--r-- | src/gnunet/service/client.go | 35 | ||||
-rw-r--r-- | src/gnunet/service/context.go | 68 | ||||
-rw-r--r-- | src/gnunet/service/dht/module.go | 49 | ||||
-rw-r--r-- | src/gnunet/service/gns/dns.go | 13 | ||||
-rw-r--r-- | src/gnunet/service/gns/module.go | 109 | ||||
-rw-r--r-- | src/gnunet/service/gns/service.go | 71 | ||||
-rw-r--r-- | src/gnunet/service/namecache/module.go | 5 | ||||
-rw-r--r-- | src/gnunet/service/service.go | 52 | ||||
-rw-r--r-- | src/gnunet/transport/channel.go | 32 | ||||
-rw-r--r-- | src/gnunet/transport/channel_netw.go | 92 | ||||
-rw-r--r-- | src/gnunet/transport/connection.go | 10 | ||||
-rw-r--r-- | src/gnunet/util/misc.go | 1 | ||||
-rw-r--r-- | src/gnunet/util/time.go | 19 |
20 files changed, 547 insertions, 96 deletions
diff --git a/src/cmd/gnunet-service-gns-go/main.go b/src/cmd/gnunet-service-gns-go/main.go index 5f4062a..2815eaa 100644 --- a/src/cmd/gnunet-service-gns-go/main.go +++ b/src/cmd/gnunet-service-gns-go/main.go | |||
@@ -89,6 +89,8 @@ loop: | |||
89 | break loop | 89 | break loop |
90 | case syscall.SIGHUP: | 90 | case syscall.SIGHUP: |
91 | logger.Println(logger.INFO, "[gns] SIGHUP") | 91 | logger.Println(logger.INFO, "[gns] SIGHUP") |
92 | case syscall.SIGURG: | ||
93 | // TODO: https://github.com/golang/go/issues/37942 | ||
92 | default: | 94 | default: |
93 | logger.Println(logger.INFO, "[gns] Unhandled signal: "+sig.String()) | 95 | logger.Println(logger.INFO, "[gns] Unhandled signal: "+sig.String()) |
94 | } | 96 | } |
diff --git a/src/cmd/peer_mockup/process.go b/src/cmd/peer_mockup/process.go index a26177b..510fc48 100644 --- a/src/cmd/peer_mockup/process.go +++ b/src/cmd/peer_mockup/process.go | |||
@@ -9,6 +9,12 @@ import ( | |||
9 | "gnunet/message" | 9 | "gnunet/message" |
10 | "gnunet/transport" | 10 | "gnunet/transport" |
11 | "gnunet/util" | 11 | "gnunet/util" |
12 | |||
13 | "github.com/bfix/gospel/concurrent" | ||
14 | ) | ||
15 | |||
16 | var ( | ||
17 | sig = concurrent.NewSignaller() | ||
12 | ) | 18 | ) |
13 | 19 | ||
14 | func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { | 20 | func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { |
@@ -20,7 +26,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { | |||
20 | in := make(chan message.Message) | 26 | in := make(chan message.Message) |
21 | go func() { | 27 | go func() { |
22 | for { | 28 | for { |
23 | msg, err := c.Receive() | 29 | msg, err := c.Receive(sig) |
24 | if err != nil { | 30 | if err != nil { |
25 | fmt.Printf("Receive: %s\n", err.Error()) | 31 | fmt.Printf("Receive: %s\n", err.Error()) |
26 | return | 32 | return |
@@ -33,7 +39,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { | |||
33 | init := (from == p) | 39 | init := (from == p) |
34 | if init { | 40 | if init { |
35 | peerid := util.NewPeerID(p.GetID()) | 41 | peerid := util.NewPeerID(p.GetID()) |
36 | c.Send(message.NewTransportTcpWelcomeMsg(peerid)) | 42 | c.Send(message.NewTransportTcpWelcomeMsg(peerid), sig) |
37 | } | 43 | } |
38 | 44 | ||
39 | // remember peer addresses (only ONE!) | 45 | // remember peer addresses (only ONE!) |
@@ -53,11 +59,11 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { | |||
53 | case *message.TransportTcpWelcomeMsg: | 59 | case *message.TransportTcpWelcomeMsg: |
54 | peerid := util.NewPeerID(p.GetID()) | 60 | peerid := util.NewPeerID(p.GetID()) |
55 | if init { | 61 | if init { |
56 | c.Send(message.NewHelloMsg(peerid)) | 62 | c.Send(message.NewHelloMsg(peerid), sig) |
57 | target := util.NewPeerID(t.GetID()) | 63 | target := util.NewPeerID(t.GetID()) |
58 | c.Send(message.NewTransportPingMsg(target, tAddr)) | 64 | c.Send(message.NewTransportPingMsg(target, tAddr), sig) |
59 | } else { | 65 | } else { |
60 | c.Send(message.NewTransportTcpWelcomeMsg(peerid)) | 66 | c.Send(message.NewTransportTcpWelcomeMsg(peerid), sig) |
61 | } | 67 | } |
62 | 68 | ||
63 | case *message.HelloMsg: | 69 | case *message.HelloMsg: |
@@ -67,7 +73,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { | |||
67 | if err := mOut.Sign(p.PrvKey()); err != nil { | 73 | if err := mOut.Sign(p.PrvKey()); err != nil { |
68 | return err | 74 | return err |
69 | } | 75 | } |
70 | c.Send(mOut) | 76 | c.Send(mOut, sig) |
71 | 77 | ||
72 | case *message.TransportPongMsg: | 78 | case *message.TransportPongMsg: |
73 | rc, err := msg.Verify(t.PubKey()) | 79 | rc, err := msg.Verify(t.PubKey()) |
@@ -79,14 +85,14 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { | |||
79 | } | 85 | } |
80 | send[message.TRANSPORT_PONG] = true | 86 | send[message.TRANSPORT_PONG] = true |
81 | if mOut, ok := pending[message.TRANSPORT_SESSION_SYN]; ok { | 87 | if mOut, ok := pending[message.TRANSPORT_SESSION_SYN]; ok { |
82 | c.Send(mOut) | 88 | c.Send(mOut, sig) |
83 | } | 89 | } |
84 | 90 | ||
85 | case *message.SessionSynMsg: | 91 | case *message.SessionSynMsg: |
86 | mOut := message.NewSessionSynAckMsg() | 92 | mOut := message.NewSessionSynAckMsg() |
87 | mOut.Timestamp = msg.Timestamp | 93 | mOut.Timestamp = msg.Timestamp |
88 | if send[message.TRANSPORT_PONG] { | 94 | if send[message.TRANSPORT_PONG] { |
89 | c.Send(mOut) | 95 | c.Send(mOut, sig) |
90 | } else { | 96 | } else { |
91 | pending[message.TRANSPORT_SESSION_SYN] = mOut | 97 | pending[message.TRANSPORT_SESSION_SYN] = mOut |
92 | } | 98 | } |
@@ -97,7 +103,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { | |||
97 | case *message.SessionAckMsg: | 103 | case *message.SessionAckMsg: |
98 | 104 | ||
99 | case *message.SessionKeepAliveMsg: | 105 | case *message.SessionKeepAliveMsg: |
100 | c.Send(message.NewSessionKeepAliveRespMsg(msg.Nonce)) | 106 | c.Send(message.NewSessionKeepAliveRespMsg(msg.Nonce), sig) |
101 | 107 | ||
102 | case *message.EphemeralKeyMsg: | 108 | case *message.EphemeralKeyMsg: |
103 | rc, err := msg.Verify(t.PubKey()) | 109 | rc, err := msg.Verify(t.PubKey()) |
@@ -108,7 +114,7 @@ func process(ch *transport.MsgChannel, from, to *core.Peer) (err error) { | |||
108 | return errors.New("EPHKEY verification failed") | 114 | return errors.New("EPHKEY verification failed") |
109 | } | 115 | } |
110 | t.SetEphKeyMsg(msg) | 116 | t.SetEphKeyMsg(msg) |
111 | c.Send(p.EphKeyMsg()) | 117 | c.Send(p.EphKeyMsg(), sig) |
112 | secret := crypto.SharedSecret(p.EphPrvKey(), t.EphKeyMsg().Public()) | 118 | secret := crypto.SharedSecret(p.EphPrvKey(), t.EphKeyMsg().Public()) |
113 | c.SharedSecret(util.Clone(secret.Bits[:])) | 119 | c.SharedSecret(util.Clone(secret.Bits[:])) |
114 | 120 | ||
diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod index 83720fd..443666f 100644 --- a/src/gnunet/go.mod +++ b/src/gnunet/go.mod | |||
@@ -3,7 +3,7 @@ module gnunet | |||
3 | go 1.12 | 3 | go 1.12 |
4 | 4 | ||
5 | require ( | 5 | require ( |
6 | github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449 | 6 | github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46 |
7 | github.com/miekg/dns v1.1.26 | 7 | github.com/miekg/dns v1.1.26 |
8 | golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 | 8 | golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 |
9 | ) | 9 | ) |
diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum index e9ece97..8c956b4 100644 --- a/src/gnunet/go.sum +++ b/src/gnunet/go.sum | |||
@@ -1,5 +1,7 @@ | |||
1 | github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449 h1:oIq3s14sMh1sq791v9VpR+GJvhVGbvuOWlfTjruRTDQ= | 1 | github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449 h1:oIq3s14sMh1sq791v9VpR+GJvhVGbvuOWlfTjruRTDQ= |
2 | github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449/go.mod h1:RQYETFV9SP+VriIsHVqCntRpSbbRvCBnNTtbUl9NAKA= | 2 | github.com/bfix/gospel v0.0.0-20190922182041-6fcd6d4fd449/go.mod h1:RQYETFV9SP+VriIsHVqCntRpSbbRvCBnNTtbUl9NAKA= |
3 | github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46 h1:5aNd1/ISbO1ltgmyUGza7kdaN4fD/Qal6uKZk9goMhw= | ||
4 | github.com/bfix/gospel v0.0.0-20200326093412-d1b2381f0c46/go.mod h1:RQYETFV9SP+VriIsHVqCntRpSbbRvCBnNTtbUl9NAKA= | ||
3 | github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU= | 5 | github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU= |
4 | github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= | 6 | github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= |
5 | golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= | 7 | golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= |
@@ -7,6 +9,8 @@ golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472 h1:Gv7RPwsi3eZ2Fgewe3CBsu | |||
7 | golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= | 9 | golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= |
8 | golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M= | 10 | golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M= |
9 | golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= | 11 | golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= |
12 | golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM= | ||
13 | golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= | ||
10 | golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= | 14 | golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= |
11 | golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= | 15 | golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= |
12 | golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= | 16 | golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= |
@@ -19,6 +23,7 @@ golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7w | |||
19 | golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe h1:6fAMxZRR6sl1Uq8U61gxU+kPTs2tR8uOySCbBP7BN/M= | 23 | golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe h1:6fAMxZRR6sl1Uq8U61gxU+kPTs2tR8uOySCbBP7BN/M= |
20 | golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | 24 | golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
21 | golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= | 25 | golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= |
26 | golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= | ||
22 | golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= | 27 | golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= |
23 | golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= | 28 | golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= |
24 | golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= | 29 | golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= |
diff --git a/src/gnunet/message/factory.go b/src/gnunet/message/factory.go index e6e6e73..33e806b 100644 --- a/src/gnunet/message/factory.go +++ b/src/gnunet/message/factory.go | |||
@@ -59,6 +59,8 @@ func NewEmptyMessage(msgType uint16) (Message, error) { | |||
59 | //------------------------------------------------------------------ | 59 | //------------------------------------------------------------------ |
60 | case DHT_CLIENT_GET: | 60 | case DHT_CLIENT_GET: |
61 | return NewDHTClientGetMsg(nil), nil | 61 | return NewDHTClientGetMsg(nil), nil |
62 | case DHT_CLIENT_GET_STOP: | ||
63 | return NewDHTClientGetStopMsg(nil), nil | ||
62 | case DHT_CLIENT_RESULT: | 64 | case DHT_CLIENT_RESULT: |
63 | return NewDHTClientResultMsg(nil), nil | 65 | return NewDHTClientResultMsg(nil), nil |
64 | 66 | ||
diff --git a/src/gnunet/message/msg_dht.go b/src/gnunet/message/msg_dht.go index 925cb71..62a233f 100644 --- a/src/gnunet/message/msg_dht.go +++ b/src/gnunet/message/msg_dht.go | |||
@@ -126,3 +126,40 @@ func (m *DHTClientResultMsg) String() string { | |||
126 | func (msg *DHTClientResultMsg) Header() *MessageHeader { | 126 | func (msg *DHTClientResultMsg) Header() *MessageHeader { |
127 | return &MessageHeader{msg.MsgSize, msg.MsgType} | 127 | return &MessageHeader{msg.MsgSize, msg.MsgType} |
128 | } | 128 | } |
129 | |||
130 | //---------------------------------------------------------------------- | ||
131 | // DHT_CLIENT_GET_STOP | ||
132 | //---------------------------------------------------------------------- | ||
133 | |||
134 | // DHTClientGetStopMsg | ||
135 | type DHTClientGetStopMsg struct { | ||
136 | MsgSize uint16 `order:"big"` // total size of message | ||
137 | MsgType uint16 `order:"big"` // DHT_CLIENT_GET_STOP (144) | ||
138 | Reserved uint32 `order:"big"` // Reserved (0) | ||
139 | Id uint64 `order:"big"` // Unique ID identifying this request | ||
140 | Key *crypto.HashCode // The key to search for | ||
141 | } | ||
142 | |||
143 | // NewDHTClientGetStopMsg creates a new default DHTClientGetStopMsg object. | ||
144 | func NewDHTClientGetStopMsg(key *crypto.HashCode) *DHTClientGetStopMsg { | ||
145 | if key == nil { | ||
146 | key = new(crypto.HashCode) | ||
147 | } | ||
148 | return &DHTClientGetStopMsg{ | ||
149 | MsgSize: 80, | ||
150 | MsgType: DHT_CLIENT_GET_STOP, | ||
151 | Reserved: 0, // mandatory | ||
152 | Id: 0, | ||
153 | Key: key, | ||
154 | } | ||
155 | } | ||
156 | |||
157 | // String returns a human-readable representation of the message. | ||
158 | func (m *DHTClientGetStopMsg) String() string { | ||
159 | return fmt.Sprintf("DHTClientGetStopMsg{Id:%d,Key=%s}", m.Id, hex.EncodeToString(m.Key.Bits)) | ||
160 | } | ||
161 | |||
162 | // Header returns the message header in a separate instance. | ||
163 | func (msg *DHTClientGetStopMsg) Header() *MessageHeader { | ||
164 | return &MessageHeader{msg.MsgSize, msg.MsgType} | ||
165 | } | ||
diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go index bc2993e..b71eb40 100644 --- a/src/gnunet/modules.go +++ b/src/gnunet/modules.go | |||
@@ -29,6 +29,7 @@ | |||
29 | package gnunet | 29 | package gnunet |
30 | 30 | ||
31 | import ( | 31 | import ( |
32 | "gnunet/service/dht" | ||
32 | "gnunet/service/gns" | 33 | "gnunet/service/gns" |
33 | "gnunet/service/namecache" | 34 | "gnunet/service/namecache" |
34 | ) | 35 | ) |
@@ -37,6 +38,7 @@ import ( | |||
37 | type Instances struct { | 38 | type Instances struct { |
38 | GNS *gns.GNSModule | 39 | GNS *gns.GNSModule |
39 | Namecache *namecache.NamecacheModule | 40 | Namecache *namecache.NamecacheModule |
41 | DHT *dht.DHTModule | ||
40 | } | 42 | } |
41 | 43 | ||
42 | // Local reference to instance list | 44 | // Local reference to instance list |
@@ -50,9 +52,14 @@ func init() { | |||
50 | // Namecache (no calls to other modules) | 52 | // Namecache (no calls to other modules) |
51 | Modules.Namecache = new(namecache.NamecacheModule) | 53 | Modules.Namecache = new(namecache.NamecacheModule) |
52 | 54 | ||
55 | // DHT (no calls to other modules) | ||
56 | Modules.DHT = new(dht.DHTModule) | ||
57 | |||
53 | // GNS (calls Namecache, DHT and Identity) | 58 | // GNS (calls Namecache, DHT and Identity) |
54 | Modules.GNS = &gns.GNSModule{ | 59 | Modules.GNS = &gns.GNSModule{ |
55 | LookupLocal: Modules.Namecache.Get, | 60 | LookupLocal: Modules.Namecache.Get, |
56 | StoreLocal: Modules.Namecache.Put, | 61 | StoreLocal: Modules.Namecache.Put, |
62 | LookupRemote: Modules.DHT.Get, | ||
63 | CancelRemote: Modules.DHT.Cancel, | ||
57 | } | 64 | } |
58 | } | 65 | } |
diff --git a/src/gnunet/service/client.go b/src/gnunet/service/client.go index 1e54c2d..fe3fa9e 100644 --- a/src/gnunet/service/client.go +++ b/src/gnunet/service/client.go | |||
@@ -25,36 +25,49 @@ import ( | |||
25 | "github.com/bfix/gospel/logger" | 25 | "github.com/bfix/gospel/logger" |
26 | ) | 26 | ) |
27 | 27 | ||
28 | // Client | 28 | // Client type: Use to perform client-side interactions with GNUnet services. |
29 | type Client struct { | 29 | type Client struct { |
30 | ch *transport.MsgChannel | 30 | ch *transport.MsgChannel // channel for message exchange |
31 | } | 31 | } |
32 | 32 | ||
33 | // NewClient | 33 | // NewClient creates a new client instance for the given channel endpoint. |
34 | func NewClient(endp string) (*Client, error) { | 34 | func NewClient(endp string) (*Client, error) { |
35 | // | 35 | // create a new channel to endpoint. |
36 | ch, err := transport.NewChannel(endp) | 36 | ch, err := transport.NewChannel(endp) |
37 | if err != nil { | 37 | if err != nil { |
38 | return nil, err | 38 | return nil, err |
39 | } | 39 | } |
40 | // wrap into a message channel for the client. | ||
40 | return &Client{ | 41 | return &Client{ |
41 | ch: transport.NewMsgChannel(ch), | 42 | ch: transport.NewMsgChannel(ch), |
42 | }, nil | 43 | }, nil |
43 | } | 44 | } |
44 | 45 | ||
45 | func (c *Client) SendRequest(req message.Message) error { | 46 | // SendRequest sends a give message to the service. |
46 | return c.ch.Send(req) | 47 | func (c *Client) SendRequest(ctx *SessionContext, req message.Message) error { |
48 | return c.ch.Send(req, ctx.Signaller()) | ||
47 | } | 49 | } |
48 | 50 | ||
49 | func (c *Client) ReceiveResponse() (message.Message, error) { | 51 | // ReceiveResponse waits for a response from the service; it can be interrupted |
50 | return c.ch.Receive() | 52 | // by sending "false" to the cmd channel. |
53 | func (c *Client) ReceiveResponse(ctx *SessionContext) (message.Message, error) { | ||
54 | return c.ch.Receive(ctx.Signaller()) | ||
51 | } | 55 | } |
52 | 56 | ||
57 | // Close a client; no further message exchange is possible. | ||
53 | func (c *Client) Close() error { | 58 | func (c *Client) Close() error { |
54 | return c.ch.Close() | 59 | return c.ch.Close() |
55 | } | 60 | } |
56 | 61 | ||
57 | func ServiceRequestResponse(caller, callee, endp string, req message.Message) (message.Message, error) { | 62 | // ServiceRequestResponse is a helper method for a one request - one response |
63 | // secenarios of client/serice interactions. | ||
64 | func ServiceRequestResponse( | ||
65 | ctx *SessionContext, | ||
66 | caller string, | ||
67 | callee string, | ||
68 | endp string, | ||
69 | req message.Message) (message.Message, error) { | ||
70 | |||
58 | // client-connect to the service | 71 | // client-connect to the service |
59 | logger.Printf(logger.DBG, "[%s] Connect to %s service\n", caller, callee) | 72 | logger.Printf(logger.DBG, "[%s] Connect to %s service\n", caller, callee) |
60 | cl, err := NewClient(endp) | 73 | cl, err := NewClient(endp) |
@@ -63,13 +76,13 @@ func ServiceRequestResponse(caller, callee, endp string, req message.Message) (m | |||
63 | } | 76 | } |
64 | // send request | 77 | // send request |
65 | logger.Printf(logger.DBG, "[%s] Sending request to %s service\n", caller, callee) | 78 | logger.Printf(logger.DBG, "[%s] Sending request to %s service\n", caller, callee) |
66 | if err = cl.SendRequest(req); err != nil { | 79 | if err = cl.SendRequest(ctx, req); err != nil { |
67 | return nil, err | 80 | return nil, err |
68 | } | 81 | } |
69 | // wait for a single response, then close the connection | 82 | // wait for a single response, then close the connection |
70 | logger.Printf(logger.DBG, "[%s] Waiting for response from %s service\n", caller, callee) | 83 | logger.Printf(logger.DBG, "[%s] Waiting for response from %s service\n", caller, callee) |
71 | var resp message.Message | 84 | var resp message.Message |
72 | if resp, err = cl.ReceiveResponse(); err != nil { | 85 | if resp, err = cl.ReceiveResponse(ctx); err != nil { |
73 | return nil, err | 86 | return nil, err |
74 | } | 87 | } |
75 | logger.Printf(logger.DBG, "[%s] Closing connection to %s service\n", caller, callee) | 88 | logger.Printf(logger.DBG, "[%s] Closing connection to %s service\n", caller, callee) |
diff --git a/src/gnunet/service/context.go b/src/gnunet/service/context.go new file mode 100644 index 0000000..4896bd5 --- /dev/null +++ b/src/gnunet/service/context.go | |||
@@ -0,0 +1,68 @@ | |||
1 | // This file is part of gnunet-go, a GNUnet-implementation in Golang. | ||
2 | // Copyright (C) 2019, 2020 Bernd Fix >Y< | ||
3 | // | ||
4 | // gnunet-go is free software: you can redistribute it and/or modify it | ||
5 | // under the terms of the GNU Affero General Public License as published | ||
6 | // by the Free Software Foundation, either version 3 of the License, | ||
7 | // or (at your option) any later version. | ||
8 | // | ||
9 | // gnunet-go is distributed in the hope that it will be useful, but | ||
10 | // WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
12 | // Affero General Public License for more details. | ||
13 | // | ||
14 | // You should have received a copy of the GNU Affero General Public License | ||
15 | // along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
16 | // | ||
17 | // SPDX-License-Identifier: AGPL3.0-or-later | ||
18 | |||
19 | package service | ||
20 | |||
21 | import ( | ||
22 | "sync" | ||
23 | |||
24 | "gnunet/util" | ||
25 | |||
26 | "github.com/bfix/gospel/concurrent" | ||
27 | ) | ||
28 | |||
29 | // SessionContext is used to set a context for each client connection handled | ||
30 | // by a service; the session is handled by the 'ServeClient' method of a | ||
31 | // service implementation. | ||
32 | type SessionContext struct { | ||
33 | Id int // session identifier | ||
34 | wg *sync.WaitGroup // wait group for the session | ||
35 | sig *concurrent.Signaller // signaller for the session | ||
36 | } | ||
37 | |||
38 | // NewSessionContext instantiates a new session context. | ||
39 | func NewSessionContext() *SessionContext { | ||
40 | return &SessionContext{ | ||
41 | Id: util.NextID(), | ||
42 | wg: new(sync.WaitGroup), | ||
43 | sig: concurrent.NewSignaller(), | ||
44 | } | ||
45 | } | ||
46 | |||
47 | // Cancel all go-routines associated with this context. | ||
48 | func (ctx *SessionContext) Cancel() { | ||
49 | // send signal to terminate... | ||
50 | ctx.sig.Send(true) | ||
51 | // wait for session go-routines to finish | ||
52 | ctx.wg.Wait() | ||
53 | } | ||
54 | |||
55 | // Add a go-routine to the wait group. | ||
56 | func (ctx *SessionContext) Add() { | ||
57 | ctx.wg.Add(1) | ||
58 | } | ||
59 | |||
60 | // Remove a go-routine from the wait group. | ||
61 | func (ctx *SessionContext) Remove() { | ||
62 | ctx.wg.Done() | ||
63 | } | ||
64 | |||
65 | // Signaller returns the working instance for the context. | ||
66 | func (ctx *SessionContext) Signaller() *concurrent.Signaller { | ||
67 | return ctx.sig | ||
68 | } | ||
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go new file mode 100644 index 0000000..a54d0eb --- /dev/null +++ b/src/gnunet/service/dht/module.go | |||
@@ -0,0 +1,49 @@ | |||
1 | // This file is part of gnunet-go, a GNUnet-implementation in Golang. | ||
2 | // Copyright (C) 2019, 2020 Bernd Fix >Y< | ||
3 | // | ||
4 | // gnunet-go is free software: you can redistribute it and/or modify it | ||
5 | // under the terms of the GNU Affero General Public License as published | ||
6 | // by the Free Software Foundation, either version 3 of the License, | ||
7 | // or (at your option) any later version. | ||
8 | // | ||
9 | // gnunet-go is distributed in the hope that it will be useful, but | ||
10 | // WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
12 | // Affero General Public License for more details. | ||
13 | // | ||
14 | // You should have received a copy of the GNU Affero General Public License | ||
15 | // along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
16 | // | ||
17 | // SPDX-License-Identifier: AGPL3.0-or-later | ||
18 | |||
19 | package dht | ||
20 | |||
21 | import ( | ||
22 | "gnunet/message" | ||
23 | "gnunet/service" | ||
24 | "gnunet/service/gns" | ||
25 | ) | ||
26 | |||
27 | //====================================================================== | ||
28 | // "DHT" implementation | ||
29 | //====================================================================== | ||
30 | |||
31 | //---------------------------------------------------------------------- | ||
32 | // Put and get blocks into/from a DHT. | ||
33 | //---------------------------------------------------------------------- | ||
34 | |||
35 | // DHT handles the permanent storage of blocks under the query key. | ||
36 | type DHTModule struct { | ||
37 | } | ||
38 | |||
39 | func (nc *DHTModule) Get(ctx *service.SessionContext, query *gns.Query) (*message.GNSBlock, error) { | ||
40 | return nil, nil | ||
41 | } | ||
42 | |||
43 | func (nc *DHTModule) Cancel(ctx *service.SessionContext, query *gns.Query) error { | ||
44 | return nil | ||
45 | } | ||
46 | |||
47 | func (nc *DHTModule) Put(ctx *service.SessionContext, block *message.GNSBlock) error { | ||
48 | return nil | ||
49 | } | ||
diff --git a/src/gnunet/service/gns/dns.go b/src/gnunet/service/gns/dns.go index 28e9813..7a9a30c 100644 --- a/src/gnunet/service/gns/dns.go +++ b/src/gnunet/service/gns/dns.go | |||
@@ -26,6 +26,7 @@ import ( | |||
26 | 26 | ||
27 | "gnunet/enums" | 27 | "gnunet/enums" |
28 | "gnunet/message" | 28 | "gnunet/message" |
29 | "gnunet/service" | ||
29 | "gnunet/util" | 30 | "gnunet/util" |
30 | 31 | ||
31 | "github.com/bfix/gospel/crypto/ed25519" | 32 | "github.com/bfix/gospel/crypto/ed25519" |
@@ -202,10 +203,16 @@ func QueryDNS(id int, name string, server net.IP, kind RRTypeList) *message.GNSR | |||
202 | // ResolveDNS resolves a name in DNS. Multiple DNS servers are queried in | 203 | // ResolveDNS resolves a name in DNS. Multiple DNS servers are queried in |
203 | // parallel; the first result delivered by any of the servers is returned | 204 | // parallel; the first result delivered by any of the servers is returned |
204 | // as the result list of matching resource records. | 205 | // as the result list of matching resource records. |
205 | func (gns *GNSModule) ResolveDNS(name string, servers []string, kind RRTypeList, pkey *ed25519.PublicKey, depth int) (set *message.GNSRecordSet, err error) { | 206 | func (gns *GNSModule) ResolveDNS( |
206 | logger.Printf(logger.DBG, "[dns] Resolution of '%s' starting...\n", name) | 207 | ctx *service.SessionContext, |
208 | name string, | ||
209 | servers []string, | ||
210 | kind RRTypeList, | ||
211 | pkey *ed25519.PublicKey, | ||
212 | depth int) (set *message.GNSRecordSet, err error) { | ||
207 | 213 | ||
208 | // start DNS queries concurrently | 214 | // start DNS queries concurrently |
215 | logger.Printf(logger.DBG, "[dns] Resolution of '%s' starting...\n", name) | ||
209 | res := make(chan *message.GNSRecordSet) | 216 | res := make(chan *message.GNSRecordSet) |
210 | running := 0 | 217 | running := 0 |
211 | for _, srv := range servers { | 218 | for _, srv := range servers { |
@@ -215,7 +222,7 @@ func (gns *GNSModule) ResolveDNS(name string, servers []string, kind RRTypeList, | |||
215 | if addr == nil { | 222 | if addr == nil { |
216 | // no, it is a name... try to resolve an IP address from the name | 223 | // no, it is a name... try to resolve an IP address from the name |
217 | query := NewRRTypeList(enums.GNS_TYPE_DNS_A, enums.GNS_TYPE_DNS_AAAA) | 224 | query := NewRRTypeList(enums.GNS_TYPE_DNS_A, enums.GNS_TYPE_DNS_AAAA) |
218 | if set, err = gns.ResolveUnknown(srv, nil, pkey, query, depth+1); err != nil { | 225 | if set, err = gns.ResolveUnknown(ctx, srv, nil, pkey, query, depth+1); err != nil { |
219 | logger.Printf(logger.ERROR, "[dns] Can't resolve NS server '%s': %s\n", srv, err.Error()) | 226 | logger.Printf(logger.ERROR, "[dns] Can't resolve NS server '%s': %s\n", srv, err.Error()) |
220 | continue | 227 | continue |
221 | } | 228 | } |
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go index d067633..43f7b7a 100644 --- a/src/gnunet/service/gns/module.go +++ b/src/gnunet/service/gns/module.go | |||
@@ -26,6 +26,8 @@ import ( | |||
26 | "gnunet/crypto" | 26 | "gnunet/crypto" |
27 | "gnunet/enums" | 27 | "gnunet/enums" |
28 | "gnunet/message" | 28 | "gnunet/message" |
29 | "gnunet/service" | ||
30 | "gnunet/transport" | ||
29 | "gnunet/util" | 31 | "gnunet/util" |
30 | 32 | ||
31 | "github.com/bfix/gospel/crypto/ed25519" | 33 | "github.com/bfix/gospel/crypto/ed25519" |
@@ -110,15 +112,22 @@ func NewQuery(pkey *ed25519.PublicKey, label string) *Query { | |||
110 | // GNSModule handles the resolution of GNS names to RRs bundled in a block. | 112 | // GNSModule handles the resolution of GNS names to RRs bundled in a block. |
111 | type GNSModule struct { | 113 | type GNSModule struct { |
112 | // Use function references for calls to methods in other modules: | 114 | // Use function references for calls to methods in other modules: |
113 | // | 115 | LookupLocal func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error) |
114 | LookupLocal func(query *Query) (*message.GNSBlock, error) | 116 | StoreLocal func(ctx *service.SessionContext, block *message.GNSBlock) error |
115 | StoreLocal func(block *message.GNSBlock) error | 117 | LookupRemote func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error) |
116 | LookupRemote func(query *Query) (*message.GNSBlock, error) | 118 | CancelRemote func(ctx *service.SessionContext, query *Query) error |
117 | } | 119 | } |
118 | 120 | ||
119 | // Resolve a GNS name with multiple labels. If pkey is not nil, the name | 121 | // Resolve a GNS name with multiple labels. If pkey is not nil, the name |
120 | // is interpreted as "relative to current zone". | 122 | // is interpreted as "relative to current zone". |
121 | func (gns *GNSModule) Resolve(path string, pkey *ed25519.PublicKey, kind RRTypeList, mode int, depth int) (set *message.GNSRecordSet, err error) { | 123 | func (gns *GNSModule) Resolve( |
124 | ctx *service.SessionContext, | ||
125 | path string, | ||
126 | pkey *ed25519.PublicKey, | ||
127 | kind RRTypeList, | ||
128 | mode int, | ||
129 | depth int) (set *message.GNSRecordSet, err error) { | ||
130 | |||
122 | // check for recursion depth | 131 | // check for recursion depth |
123 | if depth > config.Cfg.GNS.MaxDepth { | 132 | if depth > config.Cfg.GNS.MaxDepth { |
124 | return nil, ErrGNSRecursionExceeded | 133 | return nil, ErrGNSRecursionExceeded |
@@ -130,14 +139,20 @@ func (gns *GNSModule) Resolve(path string, pkey *ed25519.PublicKey, kind RRTypeL | |||
130 | // check for relative path | 139 | // check for relative path |
131 | if pkey != nil { | 140 | if pkey != nil { |
132 | //resolve relative path | 141 | //resolve relative path |
133 | return gns.ResolveRelative(names, pkey, kind, mode, depth) | 142 | return gns.ResolveRelative(ctx, names, pkey, kind, mode, depth) |
134 | } | 143 | } |
135 | // resolve absolute path | 144 | // resolve absolute path |
136 | return gns.ResolveAbsolute(names, kind, mode, depth) | 145 | return gns.ResolveAbsolute(ctx, names, kind, mode, depth) |
137 | } | 146 | } |
138 | 147 | ||
139 | // Resolve a fully qualified GNS absolute name (with multiple labels). | 148 | // Resolve a fully qualified GNS absolute name (with multiple labels). |
140 | func (gns *GNSModule) ResolveAbsolute(labels []string, kind RRTypeList, mode int, depth int) (set *message.GNSRecordSet, err error) { | 149 | func (gns *GNSModule) ResolveAbsolute( |
150 | ctx *service.SessionContext, | ||
151 | labels []string, | ||
152 | kind RRTypeList, | ||
153 | mode int, | ||
154 | depth int) (set *message.GNSRecordSet, err error) { | ||
155 | |||
141 | // get the zone key for the TLD | 156 | // get the zone key for the TLD |
142 | pkey := gns.GetZoneKey(labels[0]) | 157 | pkey := gns.GetZoneKey(labels[0]) |
143 | if pkey == nil { | 158 | if pkey == nil { |
@@ -146,12 +161,19 @@ func (gns *GNSModule) ResolveAbsolute(labels []string, kind RRTypeList, mode int | |||
146 | return | 161 | return |
147 | } | 162 | } |
148 | // continue with resolution relative to a zone. | 163 | // continue with resolution relative to a zone. |
149 | return gns.ResolveRelative(labels[1:], pkey, kind, mode, depth) | 164 | return gns.ResolveRelative(ctx, labels[1:], pkey, kind, mode, depth) |
150 | } | 165 | } |
151 | 166 | ||
152 | // Resolve relative path (to a given zone) recursively by processing simple | 167 | // Resolve relative path (to a given zone) recursively by processing simple |
153 | // (PKEY,Label) lookups in sequence and handle intermediate GNS record types | 168 | // (PKEY,Label) lookups in sequence and handle intermediate GNS record types |
154 | func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey, kind RRTypeList, mode int, depth int) (set *message.GNSRecordSet, err error) { | 169 | func (gns *GNSModule) ResolveRelative( |
170 | ctx *service.SessionContext, | ||
171 | labels []string, | ||
172 | pkey *ed25519.PublicKey, | ||
173 | kind RRTypeList, | ||
174 | mode int, | ||
175 | depth int) (set *message.GNSRecordSet, err error) { | ||
176 | |||
155 | // Process all names in sequence | 177 | // Process all names in sequence |
156 | var ( | 178 | var ( |
157 | records []*message.GNSResourceRecord // final resource records from resolution | 179 | records []*message.GNSResourceRecord // final resource records from resolution |
@@ -162,7 +184,7 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey, | |||
162 | 184 | ||
163 | // resolve next level | 185 | // resolve next level |
164 | var block *message.GNSBlock | 186 | var block *message.GNSBlock |
165 | if block, err = gns.Lookup(pkey, labels[0], mode); err != nil { | 187 | if block, err = gns.Lookup(ctx, pkey, labels[0], mode); err != nil { |
166 | // failed to resolve name | 188 | // failed to resolve name |
167 | return | 189 | return |
168 | } | 190 | } |
@@ -225,10 +247,23 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey, | |||
225 | lbls += "." | 247 | lbls += "." |
226 | } | 248 | } |
227 | fqdn := lbls + inst.Query | 249 | fqdn := lbls + inst.Query |
228 | if set, err = gns.ResolveDNS(fqdn, inst.Servers, kind, pkey, depth); err != nil { | 250 | if set, err = gns.ResolveDNS(ctx, fqdn, inst.Servers, kind, pkey, depth); err != nil { |
229 | logger.Println(logger.ERROR, "[gns] GNS2DNS resolution failed.") | 251 | logger.Println(logger.ERROR, "[gns] GNS2DNS resolution failed.") |
230 | return | 252 | return |
231 | } | 253 | } |
254 | // add synthetic LEHO record if we have results and are at the | ||
255 | // end of the name (labels). | ||
256 | if len(set.Records) > 0 && len(labels) == 1 { | ||
257 | // add LEHO supplemental record: The TTL of the new record is | ||
258 | // the longest-living record in the current set. | ||
259 | expires := util.AbsoluteTimeNow() | ||
260 | for _, rec := range set.Records { | ||
261 | if rec.Expires.Compare(expires) > 0 { | ||
262 | expires = rec.Expires | ||
263 | } | ||
264 | } | ||
265 | set.Records = append(set.Records, gns.newLEHORecord(inst.Query, expires)) | ||
266 | } | ||
232 | // we are done with resolution; pass on records to caller | 267 | // we are done with resolution; pass on records to caller |
233 | records = set.Records | 268 | records = set.Records |
234 | break | 269 | break |
@@ -250,7 +285,7 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey, | |||
250 | break | 285 | break |
251 | } | 286 | } |
252 | logger.Println(logger.DBG, "[gns] CNAME resolution required.") | 287 | logger.Println(logger.DBG, "[gns] CNAME resolution required.") |
253 | if set, err = gns.ResolveUnknown(inst.name, labels, pkey, kind, depth+1); err != nil { | 288 | if set, err = gns.ResolveUnknown(ctx, inst.name, labels, pkey, kind, depth+1); err != nil { |
254 | logger.Println(logger.ERROR, "[gns] CNAME resolution failed.") | 289 | logger.Println(logger.ERROR, "[gns] CNAME resolution failed.") |
255 | return | 290 | return |
256 | } | 291 | } |
@@ -300,7 +335,14 @@ func (gns *GNSModule) ResolveRelative(labels []string, pkey *ed25519.PublicKey, | |||
300 | // relative to the zone PKEY. If the name is an absolute GNS name (ending in | 335 | // relative to the zone PKEY. If the name is an absolute GNS name (ending in |
301 | // a PKEY TLD), it is also resolved with GNS. All other names are resolved | 336 | // a PKEY TLD), it is also resolved with GNS. All other names are resolved |
302 | // via DNS queries. | 337 | // via DNS queries. |
303 | func (gns *GNSModule) ResolveUnknown(name string, labels []string, pkey *ed25519.PublicKey, kind RRTypeList, depth int) (set *message.GNSRecordSet, err error) { | 338 | func (gns *GNSModule) ResolveUnknown( |
339 | ctx *service.SessionContext, | ||
340 | name string, | ||
341 | labels []string, | ||
342 | pkey *ed25519.PublicKey, | ||
343 | kind RRTypeList, | ||
344 | depth int) (set *message.GNSRecordSet, err error) { | ||
345 | |||
304 | // relative GNS-based server name? | 346 | // relative GNS-based server name? |
305 | if strings.HasSuffix(name, ".+") { | 347 | if strings.HasSuffix(name, ".+") { |
306 | // resolve server name relative to current zone | 348 | // resolve server name relative to current zone |
@@ -308,14 +350,14 @@ func (gns *GNSModule) ResolveUnknown(name string, labels []string, pkey *ed25519 | |||
308 | for _, label := range util.ReverseStringList(labels) { | 350 | for _, label := range util.ReverseStringList(labels) { |
309 | name += "." + label | 351 | name += "." + label |
310 | } | 352 | } |
311 | if set, err = gns.Resolve(name, pkey, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil { | 353 | if set, err = gns.Resolve(ctx, name, pkey, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil { |
312 | return | 354 | return |
313 | } | 355 | } |
314 | } else { | 356 | } else { |
315 | // check for absolute GNS name (with PKEY as TLD) | 357 | // check for absolute GNS name (with PKEY as TLD) |
316 | if zk := gns.GetZoneKey(name); zk != nil { | 358 | if zk := gns.GetZoneKey(name); zk != nil { |
317 | // resolve absolute GNS name (name ends in a PKEY) | 359 | // resolve absolute GNS name (name ends in a PKEY) |
318 | if set, err = gns.Resolve(util.StripPathRight(name), zk, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil { | 360 | if set, err = gns.Resolve(ctx, util.StripPathRight(name), zk, kind, enums.GNS_LO_DEFAULT, depth+1); err != nil { |
319 | return | 361 | return |
320 | } | 362 | } |
321 | } else { | 363 | } else { |
@@ -342,13 +384,17 @@ func (gns *GNSModule) GetZoneKey(path string) *ed25519.PublicKey { | |||
342 | } | 384 | } |
343 | 385 | ||
344 | // Lookup name in GNS. | 386 | // Lookup name in GNS. |
345 | func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, label string, mode int) (block *message.GNSBlock, err error) { | 387 | func (gns *GNSModule) Lookup( |
388 | ctx *service.SessionContext, | ||
389 | pkey *ed25519.PublicKey, | ||
390 | label string, | ||
391 | mode int) (block *message.GNSBlock, err error) { | ||
346 | 392 | ||
347 | // create query (lookup key) | 393 | // create query (lookup key) |
348 | query := NewQuery(pkey, label) | 394 | query := NewQuery(pkey, label) |
349 | 395 | ||
350 | // try local lookup first | 396 | // try local lookup first |
351 | if block, err = gns.LookupLocal(query); err != nil { | 397 | if block, err = gns.LookupLocal(ctx, query); err != nil { |
352 | logger.Printf(logger.ERROR, "[gns] local Lookup: %s\n", err.Error()) | 398 | logger.Printf(logger.ERROR, "[gns] local Lookup: %s\n", err.Error()) |
353 | block = nil | 399 | block = nil |
354 | return | 400 | return |
@@ -356,9 +402,17 @@ func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, label string, mode int) (b | |||
356 | if block == nil { | 402 | if block == nil { |
357 | if mode == enums.GNS_LO_DEFAULT { | 403 | if mode == enums.GNS_LO_DEFAULT { |
358 | // get the block from a remote lookup | 404 | // get the block from a remote lookup |
359 | if block, err = gns.LookupRemote(query); err != nil || block == nil { | 405 | if block, err = gns.LookupRemote(ctx, query); err != nil || block == nil { |
360 | if err != nil { | 406 | if err != nil { |
361 | logger.Printf(logger.ERROR, "[gns] remote Lookup: %s\n", err.Error()) | 407 | // check for aborted remote lookup: we need to cancel the query |
408 | if err == transport.ErrChannelInterrupted { | ||
409 | logger.Println(logger.WARN, "[gns] remote Lookup aborted -- cleaning up.") | ||
410 | if err = gns.CancelRemote(ctx, query); err != nil { | ||
411 | logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", err.Error()) | ||
412 | } | ||
413 | } else { | ||
414 | logger.Printf(logger.ERROR, "[gns] remote Lookup failed: %s\n", err.Error()) | ||
415 | } | ||
362 | block = nil | 416 | block = nil |
363 | } else { | 417 | } else { |
364 | logger.Println(logger.DBG, "[gns] remote Lookup: no block found") | 418 | logger.Println(logger.DBG, "[gns] remote Lookup: no block found") |
@@ -367,8 +421,21 @@ func (gns *GNSModule) Lookup(pkey *ed25519.PublicKey, label string, mode int) (b | |||
367 | return | 421 | return |
368 | } | 422 | } |
369 | // store RRs from remote locally. | 423 | // store RRs from remote locally. |
370 | gns.StoreLocal(block) | 424 | gns.StoreLocal(ctx, block) |
371 | } | 425 | } |
372 | } | 426 | } |
373 | return | 427 | return |
374 | } | 428 | } |
429 | |||
430 | // newLEHORecord creates a new supplemental GNS record of type LEHO. | ||
431 | func (gns *GNSModule) newLEHORecord(name string, expires util.AbsoluteTime) *message.GNSResourceRecord { | ||
432 | rr := new(message.GNSResourceRecord) | ||
433 | rr.Expires = expires | ||
434 | rr.Flags = uint32(enums.GNS_FLAG_SUPPL) | ||
435 | rr.Type = uint32(enums.GNS_TYPE_LEHO) | ||
436 | rr.Size = uint32(len(name) + 1) | ||
437 | rr.Data = make([]byte, rr.Size) | ||
438 | copy(rr.Data, []byte(name)) | ||
439 | rr.Data[len(name)] = 0 | ||
440 | return rr | ||
441 | } | ||
diff --git a/src/gnunet/service/gns/service.go b/src/gnunet/service/gns/service.go index 1a62e77..2526ae8 100644 --- a/src/gnunet/service/gns/service.go +++ b/src/gnunet/service/gns/service.go | |||
@@ -22,7 +22,6 @@ import ( | |||
22 | "encoding/hex" | 22 | "encoding/hex" |
23 | "fmt" | 23 | "fmt" |
24 | "io" | 24 | "io" |
25 | "sync" | ||
26 | 25 | ||
27 | "gnunet/config" | 26 | "gnunet/config" |
28 | "gnunet/crypto" | 27 | "gnunet/crypto" |
@@ -60,6 +59,7 @@ func NewGNSService() service.Service { | |||
60 | inst.LookupLocal = inst.LookupNamecache | 59 | inst.LookupLocal = inst.LookupNamecache |
61 | inst.StoreLocal = inst.StoreNamecache | 60 | inst.StoreLocal = inst.StoreNamecache |
62 | inst.LookupRemote = inst.LookupDHT | 61 | inst.LookupRemote = inst.LookupDHT |
62 | inst.CancelRemote = inst.CancelDHT | ||
63 | return inst | 63 | return inst |
64 | } | 64 | } |
65 | 65 | ||
@@ -74,15 +74,18 @@ func (s *GNSService) Stop() error { | |||
74 | } | 74 | } |
75 | 75 | ||
76 | // Serve a client channel. | 76 | // Serve a client channel. |
77 | func (s *GNSService) ServeClient(wg *sync.WaitGroup, mc *transport.MsgChannel) { | 77 | func (s *GNSService) ServeClient(ctx *service.SessionContext, mc *transport.MsgChannel) { |
78 | defer wg.Done() | 78 | |
79 | loop: | 79 | loop: |
80 | for { | 80 | for { |
81 | // receive next message from client | 81 | // receive next message from client |
82 | msg, err := mc.Receive() | 82 | logger.Printf(logger.DBG, "[gns] Waiting for message in session '%d'...\n", ctx.Id) |
83 | msg, err := mc.Receive(ctx.Signaller()) | ||
83 | if err != nil { | 84 | if err != nil { |
84 | if err == io.EOF { | 85 | if err == io.EOF { |
85 | logger.Println(logger.INFO, "[gns] Client channel closed.") | 86 | logger.Println(logger.INFO, "[gns] Client channel closed.") |
87 | } else if err == transport.ErrChannelInterrupted { | ||
88 | logger.Println(logger.INFO, "[gns] Service operation interrupted.") | ||
86 | } else { | 89 | } else { |
87 | logger.Printf(logger.ERROR, "[gns] Message-receive failed: %s\n", err.Error()) | 90 | logger.Printf(logger.ERROR, "[gns] Message-receive failed: %s\n", err.Error()) |
88 | } | 91 | } |
@@ -102,23 +105,28 @@ loop: | |||
102 | resp = respX | 105 | resp = respX |
103 | 106 | ||
104 | // perform lookup on block (locally and remote) | 107 | // perform lookup on block (locally and remote) |
105 | wg.Add(1) | ||
106 | go func() { | 108 | go func() { |
109 | ctx.Add() | ||
107 | defer func() { | 110 | defer func() { |
108 | // send response | 111 | // send response |
109 | if err := mc.Send(resp); err != nil { | 112 | if resp != nil { |
110 | logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error()) | 113 | if err := mc.Send(resp, ctx.Signaller()); err != nil { |
114 | logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error()) | ||
115 | } | ||
111 | } | 116 | } |
112 | // go-routine finished | 117 | // go-routine finished |
113 | wg.Done() | 118 | ctx.Remove() |
114 | }() | 119 | }() |
115 | 120 | ||
116 | pkey := ed25519.NewPublicKeyFromBytes(m.Zone) | 121 | pkey := ed25519.NewPublicKeyFromBytes(m.Zone) |
117 | label := m.GetName() | 122 | label := m.GetName() |
118 | kind := NewRRTypeList(int(m.Type)) | 123 | kind := NewRRTypeList(int(m.Type)) |
119 | recset, err := s.Resolve(label, pkey, kind, int(m.Options), 0) | 124 | recset, err := s.Resolve(ctx, label, pkey, kind, int(m.Options), 0) |
120 | if err != nil { | 125 | if err != nil { |
121 | logger.Printf(logger.ERROR, "[gns] Failed to lookup block: %s\n", err.Error()) | 126 | logger.Printf(logger.ERROR, "[gns] Failed to lookup block: %s\n", err.Error()) |
127 | if err == transport.ErrChannelInterrupted { | ||
128 | resp = nil | ||
129 | } | ||
122 | return | 130 | return |
123 | } | 131 | } |
124 | // handle records | 132 | // handle records |
@@ -151,12 +159,16 @@ loop: | |||
151 | break loop | 159 | break loop |
152 | } | 160 | } |
153 | } | 161 | } |
162 | // cancel all tasks running for this session/connection | ||
163 | logger.Printf(logger.INFO, "[gns] Start closing session '%d'...\n", ctx.Id) | ||
164 | ctx.Cancel() | ||
165 | |||
154 | // close client connection | 166 | // close client connection |
155 | mc.Close() | 167 | mc.Close() |
156 | } | 168 | } |
157 | 169 | ||
158 | // LookupNamecache | 170 | // LookupNamecache |
159 | func (s *GNSService) LookupNamecache(query *Query) (block *message.GNSBlock, err error) { | 171 | func (s *GNSService) LookupNamecache(ctx *service.SessionContext, query *Query) (block *message.GNSBlock, err error) { |
160 | logger.Printf(logger.DBG, "[gns] LookupNamecache(%s)...\n", hex.EncodeToString(query.Key.Bits)) | 172 | logger.Printf(logger.DBG, "[gns] LookupNamecache(%s)...\n", hex.EncodeToString(query.Key.Bits)) |
161 | 173 | ||
162 | // assemble Namecache request | 174 | // assemble Namecache request |
@@ -166,7 +178,7 @@ func (s *GNSService) LookupNamecache(query *Query) (block *message.GNSBlock, err | |||
166 | 178 | ||
167 | // get response from Namecache service | 179 | // get response from Namecache service |
168 | var resp message.Message | 180 | var resp message.Message |
169 | if resp, err = service.ServiceRequestResponse("gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil { | 181 | if resp, err = service.ServiceRequestResponse(ctx, "gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil { |
170 | return | 182 | return |
171 | } | 183 | } |
172 | 184 | ||
@@ -219,7 +231,7 @@ func (s *GNSService) LookupNamecache(query *Query) (block *message.GNSBlock, err | |||
219 | } | 231 | } |
220 | 232 | ||
221 | // StoreNamecache | 233 | // StoreNamecache |
222 | func (s *GNSService) StoreNamecache(block *message.GNSBlock) (err error) { | 234 | func (s *GNSService) StoreNamecache(ctx *service.SessionContext, block *message.GNSBlock) (err error) { |
223 | logger.Println(logger.DBG, "[gns] StoreNamecache()...") | 235 | logger.Println(logger.DBG, "[gns] StoreNamecache()...") |
224 | 236 | ||
225 | // assemble Namecache request | 237 | // assemble Namecache request |
@@ -228,7 +240,7 @@ func (s *GNSService) StoreNamecache(block *message.GNSBlock) (err error) { | |||
228 | 240 | ||
229 | // get response from Namecache service | 241 | // get response from Namecache service |
230 | var resp message.Message | 242 | var resp message.Message |
231 | if resp, err = service.ServiceRequestResponse("gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil { | 243 | if resp, err = service.ServiceRequestResponse(ctx, "gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil { |
232 | return | 244 | return |
233 | } | 245 | } |
234 | 246 | ||
@@ -255,7 +267,7 @@ func (s *GNSService) StoreNamecache(block *message.GNSBlock) (err error) { | |||
255 | } | 267 | } |
256 | 268 | ||
257 | // LookupDHT | 269 | // LookupDHT |
258 | func (s *GNSService) LookupDHT(query *Query) (block *message.GNSBlock, err error) { | 270 | func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) (block *message.GNSBlock, err error) { |
259 | logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) | 271 | logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) |
260 | 272 | ||
261 | // assemble DHT request | 273 | // assemble DHT request |
@@ -268,7 +280,7 @@ func (s *GNSService) LookupDHT(query *Query) (block *message.GNSBlock, err error | |||
268 | 280 | ||
269 | // get response from DHT service | 281 | // get response from DHT service |
270 | var resp message.Message | 282 | var resp message.Message |
271 | if resp, err = service.ServiceRequestResponse("gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil { | 283 | if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil { |
272 | return | 284 | return |
273 | } | 285 | } |
274 | 286 | ||
@@ -313,9 +325,36 @@ func (s *GNSService) LookupDHT(query *Query) (block *message.GNSBlock, err error | |||
313 | 325 | ||
314 | // we got a result from DHT that was not in the namecache, | 326 | // we got a result from DHT that was not in the namecache, |
315 | // so store it there now. | 327 | // so store it there now. |
316 | if err = s.StoreNamecache(block); err != nil { | 328 | if err = s.StoreNamecache(ctx, block); err != nil { |
317 | logger.Printf(logger.ERROR, "[gns] can't store block in Namecache: %s\n", err.Error()) | 329 | logger.Printf(logger.ERROR, "[gns] can't store block in Namecache: %s\n", err.Error()) |
318 | } | 330 | } |
319 | } | 331 | } |
320 | return | 332 | return |
321 | } | 333 | } |
334 | |||
335 | // CancelDHT | ||
336 | func (s *GNSService) CancelDHT(ctx *service.SessionContext, query *Query) (err error) { | ||
337 | logger.Printf(logger.DBG, "[gns] CancelDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) | ||
338 | |||
339 | // assemble DHT request | ||
340 | req := message.NewDHTClientGetStopMsg(query.Key) | ||
341 | req.Id = uint64(util.NextID()) | ||
342 | |||
343 | // get response from DHT service | ||
344 | var resp message.Message | ||
345 | if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil { | ||
346 | return | ||
347 | } | ||
348 | |||
349 | // handle message depending on its type | ||
350 | logger.Println(logger.DBG, "[gns] Handling response from DHT service") | ||
351 | switch m := resp.(type) { | ||
352 | case *message.DHTClientResultMsg: | ||
353 | // check for matching IDs | ||
354 | if m.Id != req.Id { | ||
355 | logger.Println(logger.ERROR, "[gns] Got response for unknown ID") | ||
356 | break | ||
357 | } | ||
358 | } | ||
359 | return | ||
360 | } | ||
diff --git a/src/gnunet/service/namecache/module.go b/src/gnunet/service/namecache/module.go index 6a10625..a205444 100644 --- a/src/gnunet/service/namecache/module.go +++ b/src/gnunet/service/namecache/module.go | |||
@@ -20,6 +20,7 @@ package namecache | |||
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "gnunet/message" | 22 | "gnunet/message" |
23 | "gnunet/service" | ||
23 | "gnunet/service/gns" | 24 | "gnunet/service/gns" |
24 | ) | 25 | ) |
25 | 26 | ||
@@ -35,10 +36,10 @@ import ( | |||
35 | type NamecacheModule struct { | 36 | type NamecacheModule struct { |
36 | } | 37 | } |
37 | 38 | ||
38 | func (nc *NamecacheModule) Get(query *gns.Query) (*message.GNSBlock, error) { | 39 | func (nc *NamecacheModule) Get(ctx *service.SessionContext, query *gns.Query) (*message.GNSBlock, error) { |
39 | return nil, nil | 40 | return nil, nil |
40 | } | 41 | } |
41 | 42 | ||
42 | func (nc *NamecacheModule) Put(block *message.GNSBlock) error { | 43 | func (nc *NamecacheModule) Put(ctx *service.SessionContext, block *message.GNSBlock) error { |
43 | return nil | 44 | return nil |
44 | } | 45 | } |
diff --git a/src/gnunet/service/service.go b/src/gnunet/service/service.go index d484a95..1017e0b 100644 --- a/src/gnunet/service/service.go +++ b/src/gnunet/service/service.go | |||
@@ -33,19 +33,21 @@ import ( | |||
33 | // Channel semantics in the specification string. | 33 | // Channel semantics in the specification string. |
34 | type Service interface { | 34 | type Service interface { |
35 | Start(spec string) error | 35 | Start(spec string) error |
36 | ServeClient(wg *sync.WaitGroup, ch *transport.MsgChannel) | 36 | ServeClient(ctx *SessionContext, ch *transport.MsgChannel) |
37 | Stop() error | 37 | Stop() error |
38 | } | 38 | } |
39 | 39 | ||
40 | // ServiceImpl is an implementation of generic service functionality. | 40 | // ServiceImpl is an implementation of generic service functionality. |
41 | type ServiceImpl struct { | 41 | type ServiceImpl struct { |
42 | impl Service | 42 | impl Service // Specific service implementation |
43 | hdlr chan transport.Channel | 43 | hdlr chan transport.Channel // Channel from listener |
44 | ctrl chan bool | 44 | ctrl chan bool // Control channel |
45 | srvc transport.ChannelServer | 45 | drop chan int // Channel to drop a session from pending list |
46 | wg *sync.WaitGroup | 46 | srvc transport.ChannelServer // multi-user service |
47 | name string | 47 | wg *sync.WaitGroup // wait group for go routine synchronization |
48 | running bool | 48 | name string // service name |
49 | running bool // service currently running? | ||
50 | pending map[int]*SessionContext // list of pending sessions | ||
49 | } | 51 | } |
50 | 52 | ||
51 | // NewServiceImpl instantiates a new ServiceImpl object. | 53 | // NewServiceImpl instantiates a new ServiceImpl object. |
@@ -54,10 +56,12 @@ func NewServiceImpl(name string, srv Service) *ServiceImpl { | |||
54 | impl: srv, | 56 | impl: srv, |
55 | hdlr: make(chan transport.Channel), | 57 | hdlr: make(chan transport.Channel), |
56 | ctrl: make(chan bool), | 58 | ctrl: make(chan bool), |
59 | drop: make(chan int), | ||
57 | srvc: nil, | 60 | srvc: nil, |
58 | wg: new(sync.WaitGroup), | 61 | wg: new(sync.WaitGroup), |
59 | name: name, | 62 | name: name, |
60 | running: false, | 63 | running: false, |
64 | pending: make(map[int]*SessionContext), | ||
61 | } | 65 | } |
62 | } | 66 | } |
63 | 67 | ||
@@ -83,6 +87,8 @@ func (si *ServiceImpl) Start(spec string) (err error) { | |||
83 | loop: | 87 | loop: |
84 | for si.running { | 88 | for si.running { |
85 | select { | 89 | select { |
90 | |||
91 | // handle incoming connections | ||
86 | case in := <-si.hdlr: | 92 | case in := <-si.hdlr: |
87 | if in == nil { | 93 | if in == nil { |
88 | logger.Printf(logger.INFO, "[%s] Listener terminated.\n", si.name) | 94 | logger.Printf(logger.INFO, "[%s] Listener terminated.\n", si.name) |
@@ -90,14 +96,38 @@ func (si *ServiceImpl) Start(spec string) (err error) { | |||
90 | } | 96 | } |
91 | switch ch := in.(type) { | 97 | switch ch := in.(type) { |
92 | case transport.Channel: | 98 | case transport.Channel: |
93 | logger.Printf(logger.INFO, "[%s] Client connected.\n", si.name) | 99 | // run a new session with context |
94 | si.wg.Add(1) | 100 | ctx := NewSessionContext() |
95 | go si.impl.ServeClient(si.wg, transport.NewMsgChannel(ch)) | 101 | sessId := ctx.Id |
102 | si.pending[sessId] = ctx | ||
103 | logger.Printf(logger.INFO, "[%s] Session '%d' started.\n", si.name, sessId) | ||
104 | |||
105 | go func() { | ||
106 | // serve client on the message channel | ||
107 | si.impl.ServeClient(ctx, transport.NewMsgChannel(ch)) | ||
108 | // session is done now. | ||
109 | logger.Printf(logger.INFO, "[%s] Session with client '%d' ended.\n", si.name, sessId) | ||
110 | si.drop <- sessId | ||
111 | }() | ||
96 | } | 112 | } |
113 | |||
114 | // handle session removal | ||
115 | case sessId := <-si.drop: | ||
116 | delete(si.pending, sessId) | ||
117 | |||
118 | // handle cancelation signal on listener. | ||
97 | case <-si.ctrl: | 119 | case <-si.ctrl: |
98 | break loop | 120 | break loop |
99 | } | 121 | } |
100 | } | 122 | } |
123 | |||
124 | // terminate pending sessions | ||
125 | for _, ctx := range si.pending { | ||
126 | logger.Printf(logger.DBG, "[%s] Session '%d' closing...\n", si.name, ctx.Id) | ||
127 | ctx.Cancel() | ||
128 | } | ||
129 | |||
130 | // close-down service | ||
101 | logger.Printf(logger.INFO, "[%s] Service closing.\n", si.name) | 131 | logger.Printf(logger.INFO, "[%s] Service closing.\n", si.name) |
102 | si.srvc.Close() | 132 | si.srvc.Close() |
103 | si.running = false | 133 | si.running = false |
diff --git a/src/gnunet/transport/channel.go b/src/gnunet/transport/channel.go index 092ab8a..4ba49ac 100644 --- a/src/gnunet/transport/channel.go +++ b/src/gnunet/transport/channel.go | |||
@@ -28,6 +28,7 @@ import ( | |||
28 | "gnunet/message" | 28 | "gnunet/message" |
29 | "gnunet/util" | 29 | "gnunet/util" |
30 | 30 | ||
31 | "github.com/bfix/gospel/concurrent" | ||
31 | "github.com/bfix/gospel/data" | 32 | "github.com/bfix/gospel/data" |
32 | "github.com/bfix/gospel/logger" | 33 | "github.com/bfix/gospel/logger" |
33 | ) | 34 | ) |
@@ -36,6 +37,8 @@ import ( | |||
36 | var ( | 37 | var ( |
37 | ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented") | 38 | ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented") |
38 | ErrChannelNotOpened = fmt.Errorf("Channel not opened") | 39 | ErrChannelNotOpened = fmt.Errorf("Channel not opened") |
40 | ErrChannelInterrupted = fmt.Errorf("Channel interrupted") | ||
41 | ErrChannelClosed = fmt.Errorf("Channel closed") | ||
39 | ) | 42 | ) |
40 | 43 | ||
41 | //////////////////////////////////////////////////////////////////////// | 44 | //////////////////////////////////////////////////////////////////////// |
@@ -49,10 +52,11 @@ var ( | |||
49 | // "tcp+1.2.3.4:5" -- for TCP channels | 52 | // "tcp+1.2.3.4:5" -- for TCP channels |
50 | // "udp+1.2.3.4:5" -- for UDP channels | 53 | // "udp+1.2.3.4:5" -- for UDP channels |
51 | type Channel interface { | 54 | type Channel interface { |
52 | Open(spec string) error | 55 | Open(spec string) error // open channel (for read/write) |
53 | Close() error | 56 | Close() error // close open channel |
54 | Read([]byte) (int, error) | 57 | IsOpen() bool // check if channel is open |
55 | Write([]byte) (int, error) | 58 | Read([]byte, *concurrent.Signaller) (int, error) // read from channel |
59 | Write([]byte, *concurrent.Signaller) (int, error) // write to channel | ||
56 | } | 60 | } |
57 | 61 | ||
58 | // ChannelFactory instantiates specific Channel imülementations. | 62 | // ChannelFactory instantiates specific Channel imülementations. |
@@ -140,7 +144,12 @@ func (c *MsgChannel) Close() error { | |||
140 | } | 144 | } |
141 | 145 | ||
142 | // Send a GNUnet message over a channel. | 146 | // Send a GNUnet message over a channel. |
143 | func (c *MsgChannel) Send(msg message.Message) error { | 147 | func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) error { |
148 | |||
149 | // check for closed channel | ||
150 | if !c.ch.IsOpen() { | ||
151 | return ErrChannelClosed | ||
152 | } | ||
144 | 153 | ||
145 | // convert message to binary data | 154 | // convert message to binary data |
146 | data, err := data.Marshal(msg) | 155 | data, err := data.Marshal(msg) |
@@ -160,7 +169,7 @@ func (c *MsgChannel) Send(msg message.Message) error { | |||
160 | } | 169 | } |
161 | 170 | ||
162 | // send packet | 171 | // send packet |
163 | n, err := c.ch.Write(data) | 172 | n, err := c.ch.Write(data, sig) |
164 | if err != nil { | 173 | if err != nil { |
165 | return err | 174 | return err |
166 | } | 175 | } |
@@ -171,9 +180,15 @@ func (c *MsgChannel) Send(msg message.Message) error { | |||
171 | } | 180 | } |
172 | 181 | ||
173 | // Receive GNUnet messages over a plain Channel. | 182 | // Receive GNUnet messages over a plain Channel. |
174 | func (c *MsgChannel) Receive() (message.Message, error) { | 183 | func (c *MsgChannel) Receive(sig *concurrent.Signaller) (message.Message, error) { |
184 | // check for closed channel | ||
185 | if !c.ch.IsOpen() { | ||
186 | return nil, ErrChannelClosed | ||
187 | } | ||
188 | |||
189 | // get bytes from channel | ||
175 | get := func(pos, count int) error { | 190 | get := func(pos, count int) error { |
176 | n, err := c.ch.Read(c.buf[pos : pos+count]) | 191 | n, err := c.ch.Read(c.buf[pos:pos+count], sig) |
177 | if err != nil { | 192 | if err != nil { |
178 | return err | 193 | return err |
179 | } | 194 | } |
@@ -182,6 +197,7 @@ func (c *MsgChannel) Receive() (message.Message, error) { | |||
182 | } | 197 | } |
183 | return nil | 198 | return nil |
184 | } | 199 | } |
200 | |||
185 | if err := get(0, 4); err != nil { | 201 | if err := get(0, 4); err != nil { |
186 | return nil, err | 202 | return nil, err |
187 | } | 203 | } |
diff --git a/src/gnunet/transport/channel_netw.go b/src/gnunet/transport/channel_netw.go index c56c089..4f0ba4a 100644 --- a/src/gnunet/transport/channel_netw.go +++ b/src/gnunet/transport/channel_netw.go | |||
@@ -24,9 +24,30 @@ import ( | |||
24 | "strconv" | 24 | "strconv" |
25 | "strings" | 25 | "strings" |
26 | 26 | ||
27 | "github.com/bfix/gospel/concurrent" | ||
27 | "github.com/bfix/gospel/logger" | 28 | "github.com/bfix/gospel/logger" |
28 | ) | 29 | ) |
29 | 30 | ||
31 | // ChannelResult for read/write operations on channels. | ||
32 | type ChannelResult struct { | ||
33 | count int // number of bytes read/written | ||
34 | err error // error (or nil) | ||
35 | } | ||
36 | |||
37 | // NewChannelResult instanciates a new object with given attributes. | ||
38 | func NewChannelResult(n int, err error) *ChannelResult { | ||
39 | return &ChannelResult{ | ||
40 | count: n, | ||
41 | err: err, | ||
42 | } | ||
43 | } | ||
44 | |||
45 | // Values() returns the attributes of a result instance (for passing up the | ||
46 | // call stack). | ||
47 | func (cr *ChannelResult) Values() (int, error) { | ||
48 | return cr.count, cr.err | ||
49 | } | ||
50 | |||
30 | //////////////////////////////////////////////////////////////////////// | 51 | //////////////////////////////////////////////////////////////////////// |
31 | // Generic network-based Channel | 52 | // Generic network-based Channel |
32 | 53 | ||
@@ -64,27 +85,86 @@ func (c *NetworkChannel) Open(spec string) (err error) { | |||
64 | // Close a network channel | 85 | // Close a network channel |
65 | func (c *NetworkChannel) Close() error { | 86 | func (c *NetworkChannel) Close() error { |
66 | if c.conn != nil { | 87 | if c.conn != nil { |
67 | return c.conn.Close() | 88 | rc := c.conn.Close() |
89 | c.conn = nil | ||
90 | return rc | ||
68 | } | 91 | } |
69 | return ErrChannelNotOpened | 92 | return ErrChannelNotOpened |
70 | } | 93 | } |
71 | 94 | ||
95 | // IsOpen returns true if the channel is opened | ||
96 | func (c *NetworkChannel) IsOpen() bool { | ||
97 | return c.conn != nil | ||
98 | } | ||
99 | |||
72 | // Read bytes from a network channel into buffer: Returns the number of read | 100 | // Read bytes from a network channel into buffer: Returns the number of read |
73 | // bytes and an error code. Only works on open channels ;) | 101 | // bytes and an error code. Only works on open channels ;) |
74 | func (c *NetworkChannel) Read(buf []byte) (int, error) { | 102 | // The read can be aborted by sending 'true' on the cmd interface; the |
103 | // channel is closed after such interruption. | ||
104 | func (c *NetworkChannel) Read(buf []byte, sig *concurrent.Signaller) (int, error) { | ||
105 | // check if the channel is open | ||
75 | if c.conn == nil { | 106 | if c.conn == nil { |
76 | return 0, ErrChannelNotOpened | 107 | return 0, ErrChannelNotOpened |
77 | } | 108 | } |
78 | return c.conn.Read(buf) | 109 | // perform operation in go-routine |
110 | result := make(chan *ChannelResult) | ||
111 | go func() { | ||
112 | result <- NewChannelResult(c.conn.Read(buf)) | ||
113 | }() | ||
114 | |||
115 | listener := sig.Listen() | ||
116 | defer sig.Drop(listener) | ||
117 | for { | ||
118 | select { | ||
119 | // handle terminate command | ||
120 | case x := <-listener: | ||
121 | switch val := x.(type) { | ||
122 | case bool: | ||
123 | if val { | ||
124 | c.conn.Close() | ||
125 | c.conn = nil | ||
126 | return 0, ErrChannelInterrupted | ||
127 | } | ||
128 | } | ||
129 | // handle result of read operation | ||
130 | case res := <-result: | ||
131 | return res.Values() | ||
132 | } | ||
133 | } | ||
79 | } | 134 | } |
80 | 135 | ||
81 | // Write buffer to a network channel: Returns the number of written bytes and | 136 | // Write buffer to a network channel: Returns the number of written bytes and |
82 | // an error code. | 137 | // an error code. The write operation can be aborted by sending 'true' on the |
83 | func (c *NetworkChannel) Write(buf []byte) (int, error) { | 138 | // command channel; the network channel is closed after such interrupt. |
139 | func (c *NetworkChannel) Write(buf []byte, sig *concurrent.Signaller) (int, error) { | ||
140 | // check if we have an open channel to write to. | ||
84 | if c.conn == nil { | 141 | if c.conn == nil { |
85 | return 0, ErrChannelNotOpened | 142 | return 0, ErrChannelNotOpened |
86 | } | 143 | } |
87 | return c.conn.Write(buf) | 144 | // perform operation in go-routine |
145 | result := make(chan *ChannelResult) | ||
146 | go func() { | ||
147 | result <- NewChannelResult(c.conn.Write(buf)) | ||
148 | }() | ||
149 | |||
150 | listener := sig.Listen() | ||
151 | defer sig.Drop(listener) | ||
152 | for { | ||
153 | select { | ||
154 | // handle terminate command | ||
155 | case x := <-listener: | ||
156 | switch val := x.(type) { | ||
157 | case bool: | ||
158 | if val { | ||
159 | c.conn.Close() | ||
160 | return 0, ErrChannelInterrupted | ||
161 | } | ||
162 | } | ||
163 | // handle result of read operation | ||
164 | case res := <-result: | ||
165 | return res.Values() | ||
166 | } | ||
167 | } | ||
88 | } | 168 | } |
89 | 169 | ||
90 | //////////////////////////////////////////////////////////////////////// | 170 | //////////////////////////////////////////////////////////////////////// |
diff --git a/src/gnunet/transport/connection.go b/src/gnunet/transport/connection.go index e2cfae2..03549fc 100644 --- a/src/gnunet/transport/connection.go +++ b/src/gnunet/transport/connection.go | |||
@@ -21,6 +21,8 @@ package transport | |||
21 | import ( | 21 | import ( |
22 | "gnunet/core" | 22 | "gnunet/core" |
23 | "gnunet/message" | 23 | "gnunet/message" |
24 | |||
25 | "github.com/bfix/gospel/concurrent" | ||
24 | ) | 26 | ) |
25 | 27 | ||
26 | // Connection for communicating peers | 28 | // Connection for communicating peers |
@@ -67,11 +69,11 @@ func (c *Connection) Close() error { | |||
67 | } | 69 | } |
68 | 70 | ||
69 | // Send a message on the connection | 71 | // Send a message on the connection |
70 | func (c *Connection) Send(msg message.Message) error { | 72 | func (c *Connection) Send(msg message.Message, sig *concurrent.Signaller) error { |
71 | return c.ch.Send(msg) | 73 | return c.ch.Send(msg, sig) |
72 | } | 74 | } |
73 | 75 | ||
74 | // Receive a message on the connection | 76 | // Receive a message on the connection |
75 | func (c *Connection) Receive() (message.Message, error) { | 77 | func (c *Connection) Receive(sig *concurrent.Signaller) (message.Message, error) { |
76 | return c.ch.Receive() | 78 | return c.ch.Receive(sig) |
77 | } | 79 | } |
diff --git a/src/gnunet/util/misc.go b/src/gnunet/util/misc.go index afad974..80f5c84 100644 --- a/src/gnunet/util/misc.go +++ b/src/gnunet/util/misc.go | |||
@@ -22,6 +22,7 @@ import ( | |||
22 | "strings" | 22 | "strings" |
23 | ) | 23 | ) |
24 | 24 | ||
25 | // CounterMap | ||
25 | type CounterMap map[interface{}]int | 26 | type CounterMap map[interface{}]int |
26 | 27 | ||
27 | func (cm CounterMap) Add(i interface{}) int { | 28 | func (cm CounterMap) Add(i interface{}) int { |
diff --git a/src/gnunet/util/time.go b/src/gnunet/util/time.go index 3e09791..e1e0e30 100644 --- a/src/gnunet/util/time.go +++ b/src/gnunet/util/time.go | |||
@@ -78,6 +78,25 @@ func (t AbsoluteTime) Expired() bool { | |||
78 | return t.Val < uint64(time.Now().Unix()) | 78 | return t.Val < uint64(time.Now().Unix()) |
79 | } | 79 | } |
80 | 80 | ||
81 | // Compare two times (-1 = (t < t2), 0 = (t == t2), 1 = (t > t2) | ||
82 | func (t AbsoluteTime) Compare(t2 AbsoluteTime) int { | ||
83 | if t.Val == math.MaxUint64 { | ||
84 | if t2.Val == math.MaxUint64 { | ||
85 | return 0 | ||
86 | } | ||
87 | return 1 | ||
88 | } | ||
89 | if t2.Val == math.MaxUint64 { | ||
90 | return -1 | ||
91 | } | ||
92 | if t.Val < t2.Val { | ||
93 | return -1 | ||
94 | } else if t.Val == t2.Val { | ||
95 | return 0 | ||
96 | } | ||
97 | return 1 | ||
98 | } | ||
99 | |||
81 | //---------------------------------------------------------------------- | 100 | //---------------------------------------------------------------------- |
82 | // Relative time | 101 | // Relative time |
83 | //---------------------------------------------------------------------- | 102 | //---------------------------------------------------------------------- |