aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBernd Fix <brf@hoi-polloi.org>2020-04-01 12:23:06 +0200
committerBernd Fix <brf@hoi-polloi.org>2020-04-01 12:23:06 +0200
commitdb7da66be57c6a9df87f3ea1f3cd681539ad9b51 (patch)
treee6f22af1c7b4a18489dea56367fe468b04e8293b /src
parent2a2a558aa553aca3394513114d13d4888ae822db (diff)
downloadgnunet-go-db7da66be57c6a9df87f3ea1f3cd681539ad9b51.tar.gz
gnunet-go-db7da66be57c6a9df87f3ea1f3cd681539ad9b51.zip
Fixed DHT cancellation sequence and minor improvements.
Diffstat (limited to 'src')
-rw-r--r--src/gnunet/modules.go1
-rw-r--r--src/gnunet/service/client.go2
-rw-r--r--src/gnunet/service/context.go21
-rw-r--r--src/gnunet/service/dht/module.go4
-rw-r--r--src/gnunet/service/gns/module.go12
-rw-r--r--src/gnunet/service/gns/service.go136
-rw-r--r--src/gnunet/transport/channel.go12
-rw-r--r--src/gnunet/transport/channel_netw.go3
8 files changed, 90 insertions, 101 deletions
diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go
index b71eb40..063b914 100644
--- a/src/gnunet/modules.go
+++ b/src/gnunet/modules.go
@@ -60,6 +60,5 @@ func init() {
60 LookupLocal: Modules.Namecache.Get, 60 LookupLocal: Modules.Namecache.Get,
61 StoreLocal: Modules.Namecache.Put, 61 StoreLocal: Modules.Namecache.Put,
62 LookupRemote: Modules.DHT.Get, 62 LookupRemote: Modules.DHT.Get,
63 CancelRemote: Modules.DHT.Cancel,
64 } 63 }
65} 64}
diff --git a/src/gnunet/service/client.go b/src/gnunet/service/client.go
index fe3fa9e..b153bab 100644
--- a/src/gnunet/service/client.go
+++ b/src/gnunet/service/client.go
@@ -69,7 +69,7 @@ func ServiceRequestResponse(
69 req message.Message) (message.Message, error) { 69 req message.Message) (message.Message, error) {
70 70
71 // client-connect to the service 71 // client-connect to the service
72 logger.Printf(logger.DBG, "[%s] Connect to %s service\n", caller, callee) 72 logger.Printf(logger.DBG, "[%s] Connecting to %s service...\n", caller, callee)
73 cl, err := NewClient(endp) 73 cl, err := NewClient(endp)
74 if err != nil { 74 if err != nil {
75 return nil, err 75 return nil, err
diff --git a/src/gnunet/service/context.go b/src/gnunet/service/context.go
index 4896bd5..ffffafb 100644
--- a/src/gnunet/service/context.go
+++ b/src/gnunet/service/context.go
@@ -30,17 +30,19 @@ import (
30// by a service; the session is handled by the 'ServeClient' method of a 30// by a service; the session is handled by the 'ServeClient' method of a
31// service implementation. 31// service implementation.
32type SessionContext struct { 32type SessionContext struct {
33 Id int // session identifier 33 Id int // session identifier
34 wg *sync.WaitGroup // wait group for the session 34 wg *sync.WaitGroup // wait group for the session
35 sig *concurrent.Signaller // signaller for the session 35 sig *concurrent.Signaller // signaller for the session
36 pending int // number of pending go-routines
36} 37}
37 38
38// NewSessionContext instantiates a new session context. 39// NewSessionContext instantiates a new session context.
39func NewSessionContext() *SessionContext { 40func NewSessionContext() *SessionContext {
40 return &SessionContext{ 41 return &SessionContext{
41 Id: util.NextID(), 42 Id: util.NextID(),
42 wg: new(sync.WaitGroup), 43 wg: new(sync.WaitGroup),
43 sig: concurrent.NewSignaller(), 44 sig: concurrent.NewSignaller(),
45 pending: 0,
44 } 46 }
45} 47}
46 48
@@ -55,11 +57,18 @@ func (ctx *SessionContext) Cancel() {
55// Add a go-routine to the wait group. 57// Add a go-routine to the wait group.
56func (ctx *SessionContext) Add() { 58func (ctx *SessionContext) Add() {
57 ctx.wg.Add(1) 59 ctx.wg.Add(1)
60 ctx.pending++
58} 61}
59 62
60// Remove a go-routine from the wait group. 63// Remove a go-routine from the wait group.
61func (ctx *SessionContext) Remove() { 64func (ctx *SessionContext) Remove() {
62 ctx.wg.Done() 65 ctx.wg.Done()
66 ctx.pending--
67}
68
69// Waiting returns the number of waiting go-routines.
70func (ctx *SessionContext) Waiting() int {
71 return ctx.pending
63} 72}
64 73
65// Signaller returns the working instance for the context. 74// Signaller returns the working instance for the context.
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go
index a54d0eb..61c2540 100644
--- a/src/gnunet/service/dht/module.go
+++ b/src/gnunet/service/dht/module.go
@@ -40,10 +40,6 @@ func (nc *DHTModule) Get(ctx *service.SessionContext, query *gns.Query) (*messag
40 return nil, nil 40 return nil, nil
41} 41}
42 42
43func (nc *DHTModule) Cancel(ctx *service.SessionContext, query *gns.Query) error {
44 return nil
45}
46
47func (nc *DHTModule) Put(ctx *service.SessionContext, block *message.GNSBlock) error { 43func (nc *DHTModule) Put(ctx *service.SessionContext, block *message.GNSBlock) error {
48 return nil 44 return nil
49} 45}
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go
index 43f7b7a..e885d2d 100644
--- a/src/gnunet/service/gns/module.go
+++ b/src/gnunet/service/gns/module.go
@@ -27,7 +27,6 @@ import (
27 "gnunet/enums" 27 "gnunet/enums"
28 "gnunet/message" 28 "gnunet/message"
29 "gnunet/service" 29 "gnunet/service"
30 "gnunet/transport"
31 "gnunet/util" 30 "gnunet/util"
32 31
33 "github.com/bfix/gospel/crypto/ed25519" 32 "github.com/bfix/gospel/crypto/ed25519"
@@ -115,7 +114,6 @@ type GNSModule struct {
115 LookupLocal func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error) 114 LookupLocal func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error)
116 StoreLocal func(ctx *service.SessionContext, block *message.GNSBlock) error 115 StoreLocal func(ctx *service.SessionContext, block *message.GNSBlock) error
117 LookupRemote func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error) 116 LookupRemote func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error)
118 CancelRemote func(ctx *service.SessionContext, query *Query) error
119} 117}
120 118
121// Resolve a GNS name with multiple labels. If pkey is not nil, the name 119// Resolve a GNS name with multiple labels. If pkey is not nil, the name
@@ -404,15 +402,7 @@ func (gns *GNSModule) Lookup(
404 // get the block from a remote lookup 402 // get the block from a remote lookup
405 if block, err = gns.LookupRemote(ctx, query); err != nil || block == nil { 403 if block, err = gns.LookupRemote(ctx, query); err != nil || block == nil {
406 if err != nil { 404 if err != nil {
407 // check for aborted remote lookup: we need to cancel the query 405 logger.Printf(logger.ERROR, "[gns] remote Lookup failed: %s\n", err.Error())
408 if err == transport.ErrChannelInterrupted {
409 logger.Println(logger.WARN, "[gns] remote Lookup aborted -- cleaning up.")
410 if err = gns.CancelRemote(ctx, query); err != nil {
411 logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", err.Error())
412 }
413 } else {
414 logger.Printf(logger.ERROR, "[gns] remote Lookup failed: %s\n", err.Error())
415 }
416 block = nil 406 block = nil
417 } else { 407 } else {
418 logger.Println(logger.DBG, "[gns] remote Lookup: no block found") 408 logger.Println(logger.DBG, "[gns] remote Lookup: no block found")
diff --git a/src/gnunet/service/gns/service.go b/src/gnunet/service/gns/service.go
index 2526ae8..464e622 100644
--- a/src/gnunet/service/gns/service.go
+++ b/src/gnunet/service/gns/service.go
@@ -59,7 +59,6 @@ func NewGNSService() service.Service {
59 inst.LookupLocal = inst.LookupNamecache 59 inst.LookupLocal = inst.LookupNamecache
60 inst.StoreLocal = inst.StoreNamecache 60 inst.StoreLocal = inst.StoreNamecache
61 inst.LookupRemote = inst.LookupDHT 61 inst.LookupRemote = inst.LookupDHT
62 inst.CancelRemote = inst.CancelDHT
63 return inst 62 return inst
64} 63}
65 64
@@ -76,45 +75,46 @@ func (s *GNSService) Stop() error {
76// Serve a client channel. 75// Serve a client channel.
77func (s *GNSService) ServeClient(ctx *service.SessionContext, mc *transport.MsgChannel) { 76func (s *GNSService) ServeClient(ctx *service.SessionContext, mc *transport.MsgChannel) {
78 77
78 reqId := 0
79loop: 79loop:
80 for { 80 for {
81 // receive next message from client 81 // receive next message from client
82 logger.Printf(logger.DBG, "[gns] Waiting for message in session '%d'...\n", ctx.Id) 82 reqId++
83 logger.Printf(logger.DBG, "[gns:%d:%d] Waiting for client request...\n", ctx.Id, reqId)
83 msg, err := mc.Receive(ctx.Signaller()) 84 msg, err := mc.Receive(ctx.Signaller())
84 if err != nil { 85 if err != nil {
85 if err == io.EOF { 86 if err == io.EOF {
86 logger.Println(logger.INFO, "[gns] Client channel closed.") 87 logger.Printf(logger.INFO, "[gns:%d:%d] Client channel closed.\n", ctx.Id, reqId)
87 } else if err == transport.ErrChannelInterrupted { 88 } else if err == transport.ErrChannelInterrupted {
88 logger.Println(logger.INFO, "[gns] Service operation interrupted.") 89 logger.Printf(logger.INFO, "[gns:%d:%d] Service operation interrupted.\n", ctx.Id, reqId)
89 } else { 90 } else {
90 logger.Printf(logger.ERROR, "[gns] Message-receive failed: %s\n", err.Error()) 91 logger.Printf(logger.ERROR, "[gns:%d:%d] Message-receive failed: %s\n", ctx.Id, reqId, err.Error())
91 } 92 }
92 break loop 93 break loop
93 } 94 }
94 logger.Printf(logger.INFO, "[gns] Received msg: %v\n", msg) 95 logger.Printf(logger.INFO, "[gns:%d:%d] Received request: %v\n", ctx.Id, reqId, msg)
95 96
96 // perform lookup 97 // perform lookup
97 var resp message.Message
98 switch m := msg.(type) { 98 switch m := msg.(type) {
99 case *message.GNSLookupMsg: 99 case *message.GNSLookupMsg:
100 //---------------------------------------------------------- 100 //----------------------------------------------------------
101 // GNS_LOOKUP 101 // GNS_LOOKUP
102 //---------------------------------------------------------- 102 //----------------------------------------------------------
103 logger.Println(logger.INFO, "[gns] Lookup request received.")
104 respX := message.NewGNSLookupResultMsg(m.Id)
105 resp = respX
106 103
107 // perform lookup on block (locally and remote) 104 // perform lookup on block (locally and remote)
108 go func() { 105 go func(id int, m *message.GNSLookupMsg) {
106 logger.Printf(logger.INFO, "[gns:%d:%d] Lookup request received.\n", ctx.Id, id)
107 resp := message.NewGNSLookupResultMsg(m.Id)
109 ctx.Add() 108 ctx.Add()
110 defer func() { 109 defer func() {
111 // send response 110 // send response
112 if resp != nil { 111 if resp != nil {
113 if err := mc.Send(resp, ctx.Signaller()); err != nil { 112 if err := mc.Send(resp, ctx.Signaller()); err != nil {
114 logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error()) 113 logger.Printf(logger.ERROR, "[gns:%d:%d] Failed to send response: %s\n", ctx.Id, id, err.Error())
115 } 114 }
116 } 115 }
117 // go-routine finished 116 // go-routine finished
117 logger.Printf(logger.DBG, "[gns:%d:%d] Lookup request finished.\n", ctx.Id, id)
118 ctx.Remove() 118 ctx.Remove()
119 }() 119 }()
120 120
@@ -123,7 +123,7 @@ loop:
123 kind := NewRRTypeList(int(m.Type)) 123 kind := NewRRTypeList(int(m.Type))
124 recset, err := s.Resolve(ctx, label, pkey, kind, int(m.Options), 0) 124 recset, err := s.Resolve(ctx, label, pkey, kind, int(m.Options), 0)
125 if err != nil { 125 if err != nil {
126 logger.Printf(logger.ERROR, "[gns] Failed to lookup block: %s\n", err.Error()) 126 logger.Printf(logger.ERROR, "[gns:%d:%d] Failed to lookup block: %s\n", ctx.Id, id, err.Error())
127 if err == transport.ErrChannelInterrupted { 127 if err == transport.ErrChannelInterrupted {
128 resp = nil 128 resp = nil
129 } 129 }
@@ -131,40 +131,40 @@ loop:
131 } 131 }
132 // handle records 132 // handle records
133 if recset != nil { 133 if recset != nil {
134 logger.Printf(logger.DBG, "[gns] Received record set with %d entries\n", recset.Count) 134 logger.Printf(logger.DBG, "[gns:%d:%d] Received record set with %d entries\n", ctx.Id, id, recset.Count)
135 135
136 // get records from block 136 // get records from block
137 if recset.Count == 0 { 137 if recset.Count == 0 {
138 logger.Println(logger.WARN, "[gns] No records in block") 138 logger.Printf(logger.WARN, "[gns:%d:%d] No records in block\n", ctx.Id, id)
139 return 139 return
140 } 140 }
141 // process records 141 // process records
142 for i, rec := range recset.Records { 142 for i, rec := range recset.Records {
143 logger.Printf(logger.DBG, "[gns] Record #%d: %v\n", i, rec) 143 logger.Printf(logger.DBG, "[gns:%d:%d] Record #%d: %v\n", ctx.Id, id, i, rec)
144 144
145 // is this the record type we are looking for? 145 // is this the record type we are looking for?
146 if rec.Type == m.Type || int(m.Type) == enums.GNS_TYPE_ANY { 146 if rec.Type == m.Type || int(m.Type) == enums.GNS_TYPE_ANY {
147 // add it to the response message 147 // add it to the response message
148 respX.AddRecord(rec) 148 resp.AddRecord(rec)
149 } 149 }
150 } 150 }
151 } 151 }
152 }() 152 }(reqId, m)
153 153
154 default: 154 default:
155 //---------------------------------------------------------- 155 //----------------------------------------------------------
156 // UNKNOWN message type received 156 // UNKNOWN message type received
157 //---------------------------------------------------------- 157 //----------------------------------------------------------
158 logger.Printf(logger.ERROR, "[gns] Unhandled message of type (%d)\n", msg.Header().MsgType) 158 logger.Printf(logger.ERROR, "[gns:%d:%d] Unhandled message of type (%d)\n", ctx.Id, reqId, msg.Header().MsgType)
159 break loop 159 break loop
160 } 160 }
161 } 161 }
162 // cancel all tasks running for this session/connection
163 logger.Printf(logger.INFO, "[gns] Start closing session '%d'...\n", ctx.Id)
164 ctx.Cancel()
165
166 // close client connection 162 // close client connection
167 mc.Close() 163 mc.Close()
164
165 // cancel all tasks running for this session/connection
166 logger.Printf(logger.INFO, "[gns:%d] Start closing session... [%d]\n", ctx.Id, ctx.Waiting())
167 ctx.Cancel()
168} 168}
169 169
170// LookupNamecache 170// LookupNamecache
@@ -269,27 +269,64 @@ func (s *GNSService) StoreNamecache(ctx *service.SessionContext, block *message.
269// LookupDHT 269// LookupDHT
270func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) (block *message.GNSBlock, err error) { 270func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) (block *message.GNSBlock, err error) {
271 logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) 271 logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", hex.EncodeToString(query.Key.Bits))
272
273 // assemble DHT request
274 req := message.NewDHTClientGetMsg(query.Key)
275 req.Id = uint64(util.NextID())
276 req.ReplLevel = uint32(enums.DHT_GNS_REPLICATION_LEVEL)
277 req.Type = uint32(enums.BLOCK_TYPE_GNS_NAMERECORD)
278 req.Options = uint32(enums.DHT_RO_DEMULTIPLEX_EVERYWHERE)
279 block = nil 272 block = nil
280 273
281 // get response from DHT service 274 // client-connect to the DHT service
282 var resp message.Message 275 logger.Println(logger.DBG, "[gns] Connecting to DHT service...")
283 if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil { 276 cl, err := service.NewClient(config.Cfg.DHT.Endpoint)
284 return 277 if err != nil {
278 return nil, err
279 }
280 defer func() {
281 logger.Println(logger.DBG, "[gns] Closing connection to DHT service")
282 cl.Close()
283 }()
284
285 var (
286 // response received from service
287 resp message.Message
288
289 // request-response interaction with service
290 interact = func(req message.Message, withResponse bool) (err error) {
291 // send request
292 logger.Println(logger.DBG, "[gns] Sending request to DHT service")
293 if err = cl.SendRequest(ctx, req); err == nil && withResponse {
294 // wait for a single response
295 logger.Println(logger.DBG, "[gns] Waiting for response from DHT service")
296 resp, err = cl.ReceiveResponse(ctx)
297 }
298 return
299 }
300 )
301
302 // send DHT GET request and wait for response
303 reqGet := message.NewDHTClientGetMsg(query.Key)
304 reqGet.Id = uint64(util.NextID())
305 reqGet.ReplLevel = uint32(enums.DHT_GNS_REPLICATION_LEVEL)
306 reqGet.Type = uint32(enums.BLOCK_TYPE_GNS_NAMERECORD)
307 reqGet.Options = uint32(enums.DHT_RO_DEMULTIPLEX_EVERYWHERE)
308
309 if err = interact(reqGet, true); err != nil {
310 // check for aborted remote lookup: we need to cancel the query
311 if err == transport.ErrChannelInterrupted {
312 logger.Println(logger.WARN, "[gns] remote Lookup aborted -- cleaning up.")
313
314 // send DHT GET_STOP request and terminate
315 reqStop := message.NewDHTClientGetStopMsg(query.Key)
316 reqStop.Id = reqGet.Id
317 if err = interact(reqStop, false); err != nil {
318 logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", err.Error())
319 }
320 return nil, transport.ErrChannelInterrupted
321 }
285 } 322 }
286 323
287 // handle message depending on its type 324 // handle response message depending on its type
288 logger.Println(logger.DBG, "[gns] Handling response from DHT service") 325 logger.Println(logger.DBG, "[gns] Handling response from DHT service")
289 switch m := resp.(type) { 326 switch m := resp.(type) {
290 case *message.DHTClientResultMsg: 327 case *message.DHTClientResultMsg:
291 // check for matching IDs 328 // check for matching IDs
292 if m.Id != req.Id { 329 if m.Id != reqGet.Id {
293 logger.Println(logger.ERROR, "[gns] Got response for unknown ID") 330 logger.Println(logger.ERROR, "[gns] Got response for unknown ID")
294 break 331 break
295 } 332 }
@@ -331,30 +368,3 @@ func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) (block
331 } 368 }
332 return 369 return
333} 370}
334
335// CancelDHT
336func (s *GNSService) CancelDHT(ctx *service.SessionContext, query *Query) (err error) {
337 logger.Printf(logger.DBG, "[gns] CancelDHT(%s)...\n", hex.EncodeToString(query.Key.Bits))
338
339 // assemble DHT request
340 req := message.NewDHTClientGetStopMsg(query.Key)
341 req.Id = uint64(util.NextID())
342
343 // get response from DHT service
344 var resp message.Message
345 if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil {
346 return
347 }
348
349 // handle message depending on its type
350 logger.Println(logger.DBG, "[gns] Handling response from DHT service")
351 switch m := resp.(type) {
352 case *message.DHTClientResultMsg:
353 // check for matching IDs
354 if m.Id != req.Id {
355 logger.Println(logger.ERROR, "[gns] Got response for unknown ID")
356 break
357 }
358 }
359 return
360}
diff --git a/src/gnunet/transport/channel.go b/src/gnunet/transport/channel.go
index 4ba49ac..458a063 100644
--- a/src/gnunet/transport/channel.go
+++ b/src/gnunet/transport/channel.go
@@ -38,7 +38,6 @@ var (
38 ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented") 38 ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented")
39 ErrChannelNotOpened = fmt.Errorf("Channel not opened") 39 ErrChannelNotOpened = fmt.Errorf("Channel not opened")
40 ErrChannelInterrupted = fmt.Errorf("Channel interrupted") 40 ErrChannelInterrupted = fmt.Errorf("Channel interrupted")
41 ErrChannelClosed = fmt.Errorf("Channel closed")
42) 41)
43 42
44//////////////////////////////////////////////////////////////////////// 43////////////////////////////////////////////////////////////////////////
@@ -145,12 +144,6 @@ func (c *MsgChannel) Close() error {
145 144
146// Send a GNUnet message over a channel. 145// Send a GNUnet message over a channel.
147func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) error { 146func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) error {
148
149 // check for closed channel
150 if !c.ch.IsOpen() {
151 return ErrChannelClosed
152 }
153
154 // convert message to binary data 147 // convert message to binary data
155 data, err := data.Marshal(msg) 148 data, err := data.Marshal(msg)
156 if err != nil { 149 if err != nil {
@@ -181,11 +174,6 @@ func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) error
181 174
182// Receive GNUnet messages over a plain Channel. 175// Receive GNUnet messages over a plain Channel.
183func (c *MsgChannel) Receive(sig *concurrent.Signaller) (message.Message, error) { 176func (c *MsgChannel) Receive(sig *concurrent.Signaller) (message.Message, error) {
184 // check for closed channel
185 if !c.ch.IsOpen() {
186 return nil, ErrChannelClosed
187 }
188
189 // get bytes from channel 177 // get bytes from channel
190 get := func(pos, count int) error { 178 get := func(pos, count int) error {
191 n, err := c.ch.Read(c.buf[pos:pos+count], sig) 179 n, err := c.ch.Read(c.buf[pos:pos+count], sig)
diff --git a/src/gnunet/transport/channel_netw.go b/src/gnunet/transport/channel_netw.go
index 4f0ba4a..b70faa4 100644
--- a/src/gnunet/transport/channel_netw.go
+++ b/src/gnunet/transport/channel_netw.go
@@ -121,8 +121,6 @@ func (c *NetworkChannel) Read(buf []byte, sig *concurrent.Signaller) (int, error
121 switch val := x.(type) { 121 switch val := x.(type) {
122 case bool: 122 case bool:
123 if val { 123 if val {
124 c.conn.Close()
125 c.conn = nil
126 return 0, ErrChannelInterrupted 124 return 0, ErrChannelInterrupted
127 } 125 }
128 } 126 }
@@ -156,7 +154,6 @@ func (c *NetworkChannel) Write(buf []byte, sig *concurrent.Signaller) (int, erro
156 switch val := x.(type) { 154 switch val := x.(type) {
157 case bool: 155 case bool:
158 if val { 156 if val {
159 c.conn.Close()
160 return 0, ErrChannelInterrupted 157 return 0, ErrChannelInterrupted
161 } 158 }
162 } 159 }