diff options
author | Bernd Fix <brf@hoi-polloi.org> | 2020-04-01 12:23:06 +0200 |
---|---|---|
committer | Bernd Fix <brf@hoi-polloi.org> | 2020-04-01 12:23:06 +0200 |
commit | db7da66be57c6a9df87f3ea1f3cd681539ad9b51 (patch) | |
tree | e6f22af1c7b4a18489dea56367fe468b04e8293b /src | |
parent | 2a2a558aa553aca3394513114d13d4888ae822db (diff) | |
download | gnunet-go-db7da66be57c6a9df87f3ea1f3cd681539ad9b51.tar.gz gnunet-go-db7da66be57c6a9df87f3ea1f3cd681539ad9b51.zip |
Fixed DHT cancellation sequence and minor improvements.
Diffstat (limited to 'src')
-rw-r--r-- | src/gnunet/modules.go | 1 | ||||
-rw-r--r-- | src/gnunet/service/client.go | 2 | ||||
-rw-r--r-- | src/gnunet/service/context.go | 21 | ||||
-rw-r--r-- | src/gnunet/service/dht/module.go | 4 | ||||
-rw-r--r-- | src/gnunet/service/gns/module.go | 12 | ||||
-rw-r--r-- | src/gnunet/service/gns/service.go | 136 | ||||
-rw-r--r-- | src/gnunet/transport/channel.go | 12 | ||||
-rw-r--r-- | src/gnunet/transport/channel_netw.go | 3 |
8 files changed, 90 insertions, 101 deletions
diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go index b71eb40..063b914 100644 --- a/src/gnunet/modules.go +++ b/src/gnunet/modules.go | |||
@@ -60,6 +60,5 @@ func init() { | |||
60 | LookupLocal: Modules.Namecache.Get, | 60 | LookupLocal: Modules.Namecache.Get, |
61 | StoreLocal: Modules.Namecache.Put, | 61 | StoreLocal: Modules.Namecache.Put, |
62 | LookupRemote: Modules.DHT.Get, | 62 | LookupRemote: Modules.DHT.Get, |
63 | CancelRemote: Modules.DHT.Cancel, | ||
64 | } | 63 | } |
65 | } | 64 | } |
diff --git a/src/gnunet/service/client.go b/src/gnunet/service/client.go index fe3fa9e..b153bab 100644 --- a/src/gnunet/service/client.go +++ b/src/gnunet/service/client.go | |||
@@ -69,7 +69,7 @@ func ServiceRequestResponse( | |||
69 | req message.Message) (message.Message, error) { | 69 | req message.Message) (message.Message, error) { |
70 | 70 | ||
71 | // client-connect to the service | 71 | // client-connect to the service |
72 | logger.Printf(logger.DBG, "[%s] Connect to %s service\n", caller, callee) | 72 | logger.Printf(logger.DBG, "[%s] Connecting to %s service...\n", caller, callee) |
73 | cl, err := NewClient(endp) | 73 | cl, err := NewClient(endp) |
74 | if err != nil { | 74 | if err != nil { |
75 | return nil, err | 75 | return nil, err |
diff --git a/src/gnunet/service/context.go b/src/gnunet/service/context.go index 4896bd5..ffffafb 100644 --- a/src/gnunet/service/context.go +++ b/src/gnunet/service/context.go | |||
@@ -30,17 +30,19 @@ import ( | |||
30 | // by a service; the session is handled by the 'ServeClient' method of a | 30 | // by a service; the session is handled by the 'ServeClient' method of a |
31 | // service implementation. | 31 | // service implementation. |
32 | type SessionContext struct { | 32 | type SessionContext struct { |
33 | Id int // session identifier | 33 | Id int // session identifier |
34 | wg *sync.WaitGroup // wait group for the session | 34 | wg *sync.WaitGroup // wait group for the session |
35 | sig *concurrent.Signaller // signaller for the session | 35 | sig *concurrent.Signaller // signaller for the session |
36 | pending int // number of pending go-routines | ||
36 | } | 37 | } |
37 | 38 | ||
38 | // NewSessionContext instantiates a new session context. | 39 | // NewSessionContext instantiates a new session context. |
39 | func NewSessionContext() *SessionContext { | 40 | func NewSessionContext() *SessionContext { |
40 | return &SessionContext{ | 41 | return &SessionContext{ |
41 | Id: util.NextID(), | 42 | Id: util.NextID(), |
42 | wg: new(sync.WaitGroup), | 43 | wg: new(sync.WaitGroup), |
43 | sig: concurrent.NewSignaller(), | 44 | sig: concurrent.NewSignaller(), |
45 | pending: 0, | ||
44 | } | 46 | } |
45 | } | 47 | } |
46 | 48 | ||
@@ -55,11 +57,18 @@ func (ctx *SessionContext) Cancel() { | |||
55 | // Add a go-routine to the wait group. | 57 | // Add a go-routine to the wait group. |
56 | func (ctx *SessionContext) Add() { | 58 | func (ctx *SessionContext) Add() { |
57 | ctx.wg.Add(1) | 59 | ctx.wg.Add(1) |
60 | ctx.pending++ | ||
58 | } | 61 | } |
59 | 62 | ||
60 | // Remove a go-routine from the wait group. | 63 | // Remove a go-routine from the wait group. |
61 | func (ctx *SessionContext) Remove() { | 64 | func (ctx *SessionContext) Remove() { |
62 | ctx.wg.Done() | 65 | ctx.wg.Done() |
66 | ctx.pending-- | ||
67 | } | ||
68 | |||
69 | // Waiting returns the number of waiting go-routines. | ||
70 | func (ctx *SessionContext) Waiting() int { | ||
71 | return ctx.pending | ||
63 | } | 72 | } |
64 | 73 | ||
65 | // Signaller returns the working instance for the context. | 74 | // Signaller returns the working instance for the context. |
diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go index a54d0eb..61c2540 100644 --- a/src/gnunet/service/dht/module.go +++ b/src/gnunet/service/dht/module.go | |||
@@ -40,10 +40,6 @@ func (nc *DHTModule) Get(ctx *service.SessionContext, query *gns.Query) (*messag | |||
40 | return nil, nil | 40 | return nil, nil |
41 | } | 41 | } |
42 | 42 | ||
43 | func (nc *DHTModule) Cancel(ctx *service.SessionContext, query *gns.Query) error { | ||
44 | return nil | ||
45 | } | ||
46 | |||
47 | func (nc *DHTModule) Put(ctx *service.SessionContext, block *message.GNSBlock) error { | 43 | func (nc *DHTModule) Put(ctx *service.SessionContext, block *message.GNSBlock) error { |
48 | return nil | 44 | return nil |
49 | } | 45 | } |
diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go index 43f7b7a..e885d2d 100644 --- a/src/gnunet/service/gns/module.go +++ b/src/gnunet/service/gns/module.go | |||
@@ -27,7 +27,6 @@ import ( | |||
27 | "gnunet/enums" | 27 | "gnunet/enums" |
28 | "gnunet/message" | 28 | "gnunet/message" |
29 | "gnunet/service" | 29 | "gnunet/service" |
30 | "gnunet/transport" | ||
31 | "gnunet/util" | 30 | "gnunet/util" |
32 | 31 | ||
33 | "github.com/bfix/gospel/crypto/ed25519" | 32 | "github.com/bfix/gospel/crypto/ed25519" |
@@ -115,7 +114,6 @@ type GNSModule struct { | |||
115 | LookupLocal func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error) | 114 | LookupLocal func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error) |
116 | StoreLocal func(ctx *service.SessionContext, block *message.GNSBlock) error | 115 | StoreLocal func(ctx *service.SessionContext, block *message.GNSBlock) error |
117 | LookupRemote func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error) | 116 | LookupRemote func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error) |
118 | CancelRemote func(ctx *service.SessionContext, query *Query) error | ||
119 | } | 117 | } |
120 | 118 | ||
121 | // Resolve a GNS name with multiple labels. If pkey is not nil, the name | 119 | // Resolve a GNS name with multiple labels. If pkey is not nil, the name |
@@ -404,15 +402,7 @@ func (gns *GNSModule) Lookup( | |||
404 | // get the block from a remote lookup | 402 | // get the block from a remote lookup |
405 | if block, err = gns.LookupRemote(ctx, query); err != nil || block == nil { | 403 | if block, err = gns.LookupRemote(ctx, query); err != nil || block == nil { |
406 | if err != nil { | 404 | if err != nil { |
407 | // check for aborted remote lookup: we need to cancel the query | 405 | logger.Printf(logger.ERROR, "[gns] remote Lookup failed: %s\n", err.Error()) |
408 | if err == transport.ErrChannelInterrupted { | ||
409 | logger.Println(logger.WARN, "[gns] remote Lookup aborted -- cleaning up.") | ||
410 | if err = gns.CancelRemote(ctx, query); err != nil { | ||
411 | logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", err.Error()) | ||
412 | } | ||
413 | } else { | ||
414 | logger.Printf(logger.ERROR, "[gns] remote Lookup failed: %s\n", err.Error()) | ||
415 | } | ||
416 | block = nil | 406 | block = nil |
417 | } else { | 407 | } else { |
418 | logger.Println(logger.DBG, "[gns] remote Lookup: no block found") | 408 | logger.Println(logger.DBG, "[gns] remote Lookup: no block found") |
diff --git a/src/gnunet/service/gns/service.go b/src/gnunet/service/gns/service.go index 2526ae8..464e622 100644 --- a/src/gnunet/service/gns/service.go +++ b/src/gnunet/service/gns/service.go | |||
@@ -59,7 +59,6 @@ func NewGNSService() service.Service { | |||
59 | inst.LookupLocal = inst.LookupNamecache | 59 | inst.LookupLocal = inst.LookupNamecache |
60 | inst.StoreLocal = inst.StoreNamecache | 60 | inst.StoreLocal = inst.StoreNamecache |
61 | inst.LookupRemote = inst.LookupDHT | 61 | inst.LookupRemote = inst.LookupDHT |
62 | inst.CancelRemote = inst.CancelDHT | ||
63 | return inst | 62 | return inst |
64 | } | 63 | } |
65 | 64 | ||
@@ -76,45 +75,46 @@ func (s *GNSService) Stop() error { | |||
76 | // Serve a client channel. | 75 | // Serve a client channel. |
77 | func (s *GNSService) ServeClient(ctx *service.SessionContext, mc *transport.MsgChannel) { | 76 | func (s *GNSService) ServeClient(ctx *service.SessionContext, mc *transport.MsgChannel) { |
78 | 77 | ||
78 | reqId := 0 | ||
79 | loop: | 79 | loop: |
80 | for { | 80 | for { |
81 | // receive next message from client | 81 | // receive next message from client |
82 | logger.Printf(logger.DBG, "[gns] Waiting for message in session '%d'...\n", ctx.Id) | 82 | reqId++ |
83 | logger.Printf(logger.DBG, "[gns:%d:%d] Waiting for client request...\n", ctx.Id, reqId) | ||
83 | msg, err := mc.Receive(ctx.Signaller()) | 84 | msg, err := mc.Receive(ctx.Signaller()) |
84 | if err != nil { | 85 | if err != nil { |
85 | if err == io.EOF { | 86 | if err == io.EOF { |
86 | logger.Println(logger.INFO, "[gns] Client channel closed.") | 87 | logger.Printf(logger.INFO, "[gns:%d:%d] Client channel closed.\n", ctx.Id, reqId) |
87 | } else if err == transport.ErrChannelInterrupted { | 88 | } else if err == transport.ErrChannelInterrupted { |
88 | logger.Println(logger.INFO, "[gns] Service operation interrupted.") | 89 | logger.Printf(logger.INFO, "[gns:%d:%d] Service operation interrupted.\n", ctx.Id, reqId) |
89 | } else { | 90 | } else { |
90 | logger.Printf(logger.ERROR, "[gns] Message-receive failed: %s\n", err.Error()) | 91 | logger.Printf(logger.ERROR, "[gns:%d:%d] Message-receive failed: %s\n", ctx.Id, reqId, err.Error()) |
91 | } | 92 | } |
92 | break loop | 93 | break loop |
93 | } | 94 | } |
94 | logger.Printf(logger.INFO, "[gns] Received msg: %v\n", msg) | 95 | logger.Printf(logger.INFO, "[gns:%d:%d] Received request: %v\n", ctx.Id, reqId, msg) |
95 | 96 | ||
96 | // perform lookup | 97 | // perform lookup |
97 | var resp message.Message | ||
98 | switch m := msg.(type) { | 98 | switch m := msg.(type) { |
99 | case *message.GNSLookupMsg: | 99 | case *message.GNSLookupMsg: |
100 | //---------------------------------------------------------- | 100 | //---------------------------------------------------------- |
101 | // GNS_LOOKUP | 101 | // GNS_LOOKUP |
102 | //---------------------------------------------------------- | 102 | //---------------------------------------------------------- |
103 | logger.Println(logger.INFO, "[gns] Lookup request received.") | ||
104 | respX := message.NewGNSLookupResultMsg(m.Id) | ||
105 | resp = respX | ||
106 | 103 | ||
107 | // perform lookup on block (locally and remote) | 104 | // perform lookup on block (locally and remote) |
108 | go func() { | 105 | go func(id int, m *message.GNSLookupMsg) { |
106 | logger.Printf(logger.INFO, "[gns:%d:%d] Lookup request received.\n", ctx.Id, id) | ||
107 | resp := message.NewGNSLookupResultMsg(m.Id) | ||
109 | ctx.Add() | 108 | ctx.Add() |
110 | defer func() { | 109 | defer func() { |
111 | // send response | 110 | // send response |
112 | if resp != nil { | 111 | if resp != nil { |
113 | if err := mc.Send(resp, ctx.Signaller()); err != nil { | 112 | if err := mc.Send(resp, ctx.Signaller()); err != nil { |
114 | logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error()) | 113 | logger.Printf(logger.ERROR, "[gns:%d:%d] Failed to send response: %s\n", ctx.Id, id, err.Error()) |
115 | } | 114 | } |
116 | } | 115 | } |
117 | // go-routine finished | 116 | // go-routine finished |
117 | logger.Printf(logger.DBG, "[gns:%d:%d] Lookup request finished.\n", ctx.Id, id) | ||
118 | ctx.Remove() | 118 | ctx.Remove() |
119 | }() | 119 | }() |
120 | 120 | ||
@@ -123,7 +123,7 @@ loop: | |||
123 | kind := NewRRTypeList(int(m.Type)) | 123 | kind := NewRRTypeList(int(m.Type)) |
124 | recset, err := s.Resolve(ctx, label, pkey, kind, int(m.Options), 0) | 124 | recset, err := s.Resolve(ctx, label, pkey, kind, int(m.Options), 0) |
125 | if err != nil { | 125 | if err != nil { |
126 | logger.Printf(logger.ERROR, "[gns] Failed to lookup block: %s\n", err.Error()) | 126 | logger.Printf(logger.ERROR, "[gns:%d:%d] Failed to lookup block: %s\n", ctx.Id, id, err.Error()) |
127 | if err == transport.ErrChannelInterrupted { | 127 | if err == transport.ErrChannelInterrupted { |
128 | resp = nil | 128 | resp = nil |
129 | } | 129 | } |
@@ -131,40 +131,40 @@ loop: | |||
131 | } | 131 | } |
132 | // handle records | 132 | // handle records |
133 | if recset != nil { | 133 | if recset != nil { |
134 | logger.Printf(logger.DBG, "[gns] Received record set with %d entries\n", recset.Count) | 134 | logger.Printf(logger.DBG, "[gns:%d:%d] Received record set with %d entries\n", ctx.Id, id, recset.Count) |
135 | 135 | ||
136 | // get records from block | 136 | // get records from block |
137 | if recset.Count == 0 { | 137 | if recset.Count == 0 { |
138 | logger.Println(logger.WARN, "[gns] No records in block") | 138 | logger.Printf(logger.WARN, "[gns:%d:%d] No records in block\n", ctx.Id, id) |
139 | return | 139 | return |
140 | } | 140 | } |
141 | // process records | 141 | // process records |
142 | for i, rec := range recset.Records { | 142 | for i, rec := range recset.Records { |
143 | logger.Printf(logger.DBG, "[gns] Record #%d: %v\n", i, rec) | 143 | logger.Printf(logger.DBG, "[gns:%d:%d] Record #%d: %v\n", ctx.Id, id, i, rec) |
144 | 144 | ||
145 | // is this the record type we are looking for? | 145 | // is this the record type we are looking for? |
146 | if rec.Type == m.Type || int(m.Type) == enums.GNS_TYPE_ANY { | 146 | if rec.Type == m.Type || int(m.Type) == enums.GNS_TYPE_ANY { |
147 | // add it to the response message | 147 | // add it to the response message |
148 | respX.AddRecord(rec) | 148 | resp.AddRecord(rec) |
149 | } | 149 | } |
150 | } | 150 | } |
151 | } | 151 | } |
152 | }() | 152 | }(reqId, m) |
153 | 153 | ||
154 | default: | 154 | default: |
155 | //---------------------------------------------------------- | 155 | //---------------------------------------------------------- |
156 | // UNKNOWN message type received | 156 | // UNKNOWN message type received |
157 | //---------------------------------------------------------- | 157 | //---------------------------------------------------------- |
158 | logger.Printf(logger.ERROR, "[gns] Unhandled message of type (%d)\n", msg.Header().MsgType) | 158 | logger.Printf(logger.ERROR, "[gns:%d:%d] Unhandled message of type (%d)\n", ctx.Id, reqId, msg.Header().MsgType) |
159 | break loop | 159 | break loop |
160 | } | 160 | } |
161 | } | 161 | } |
162 | // cancel all tasks running for this session/connection | ||
163 | logger.Printf(logger.INFO, "[gns] Start closing session '%d'...\n", ctx.Id) | ||
164 | ctx.Cancel() | ||
165 | |||
166 | // close client connection | 162 | // close client connection |
167 | mc.Close() | 163 | mc.Close() |
164 | |||
165 | // cancel all tasks running for this session/connection | ||
166 | logger.Printf(logger.INFO, "[gns:%d] Start closing session... [%d]\n", ctx.Id, ctx.Waiting()) | ||
167 | ctx.Cancel() | ||
168 | } | 168 | } |
169 | 169 | ||
170 | // LookupNamecache | 170 | // LookupNamecache |
@@ -269,27 +269,64 @@ func (s *GNSService) StoreNamecache(ctx *service.SessionContext, block *message. | |||
269 | // LookupDHT | 269 | // LookupDHT |
270 | func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) (block *message.GNSBlock, err error) { | 270 | func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) (block *message.GNSBlock, err error) { |
271 | logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) | 271 | logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) |
272 | |||
273 | // assemble DHT request | ||
274 | req := message.NewDHTClientGetMsg(query.Key) | ||
275 | req.Id = uint64(util.NextID()) | ||
276 | req.ReplLevel = uint32(enums.DHT_GNS_REPLICATION_LEVEL) | ||
277 | req.Type = uint32(enums.BLOCK_TYPE_GNS_NAMERECORD) | ||
278 | req.Options = uint32(enums.DHT_RO_DEMULTIPLEX_EVERYWHERE) | ||
279 | block = nil | 272 | block = nil |
280 | 273 | ||
281 | // get response from DHT service | 274 | // client-connect to the DHT service |
282 | var resp message.Message | 275 | logger.Println(logger.DBG, "[gns] Connecting to DHT service...") |
283 | if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil { | 276 | cl, err := service.NewClient(config.Cfg.DHT.Endpoint) |
284 | return | 277 | if err != nil { |
278 | return nil, err | ||
279 | } | ||
280 | defer func() { | ||
281 | logger.Println(logger.DBG, "[gns] Closing connection to DHT service") | ||
282 | cl.Close() | ||
283 | }() | ||
284 | |||
285 | var ( | ||
286 | // response received from service | ||
287 | resp message.Message | ||
288 | |||
289 | // request-response interaction with service | ||
290 | interact = func(req message.Message, withResponse bool) (err error) { | ||
291 | // send request | ||
292 | logger.Println(logger.DBG, "[gns] Sending request to DHT service") | ||
293 | if err = cl.SendRequest(ctx, req); err == nil && withResponse { | ||
294 | // wait for a single response | ||
295 | logger.Println(logger.DBG, "[gns] Waiting for response from DHT service") | ||
296 | resp, err = cl.ReceiveResponse(ctx) | ||
297 | } | ||
298 | return | ||
299 | } | ||
300 | ) | ||
301 | |||
302 | // send DHT GET request and wait for response | ||
303 | reqGet := message.NewDHTClientGetMsg(query.Key) | ||
304 | reqGet.Id = uint64(util.NextID()) | ||
305 | reqGet.ReplLevel = uint32(enums.DHT_GNS_REPLICATION_LEVEL) | ||
306 | reqGet.Type = uint32(enums.BLOCK_TYPE_GNS_NAMERECORD) | ||
307 | reqGet.Options = uint32(enums.DHT_RO_DEMULTIPLEX_EVERYWHERE) | ||
308 | |||
309 | if err = interact(reqGet, true); err != nil { | ||
310 | // check for aborted remote lookup: we need to cancel the query | ||
311 | if err == transport.ErrChannelInterrupted { | ||
312 | logger.Println(logger.WARN, "[gns] remote Lookup aborted -- cleaning up.") | ||
313 | |||
314 | // send DHT GET_STOP request and terminate | ||
315 | reqStop := message.NewDHTClientGetStopMsg(query.Key) | ||
316 | reqStop.Id = reqGet.Id | ||
317 | if err = interact(reqStop, false); err != nil { | ||
318 | logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", err.Error()) | ||
319 | } | ||
320 | return nil, transport.ErrChannelInterrupted | ||
321 | } | ||
285 | } | 322 | } |
286 | 323 | ||
287 | // handle message depending on its type | 324 | // handle response message depending on its type |
288 | logger.Println(logger.DBG, "[gns] Handling response from DHT service") | 325 | logger.Println(logger.DBG, "[gns] Handling response from DHT service") |
289 | switch m := resp.(type) { | 326 | switch m := resp.(type) { |
290 | case *message.DHTClientResultMsg: | 327 | case *message.DHTClientResultMsg: |
291 | // check for matching IDs | 328 | // check for matching IDs |
292 | if m.Id != req.Id { | 329 | if m.Id != reqGet.Id { |
293 | logger.Println(logger.ERROR, "[gns] Got response for unknown ID") | 330 | logger.Println(logger.ERROR, "[gns] Got response for unknown ID") |
294 | break | 331 | break |
295 | } | 332 | } |
@@ -331,30 +368,3 @@ func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) (block | |||
331 | } | 368 | } |
332 | return | 369 | return |
333 | } | 370 | } |
334 | |||
335 | // CancelDHT | ||
336 | func (s *GNSService) CancelDHT(ctx *service.SessionContext, query *Query) (err error) { | ||
337 | logger.Printf(logger.DBG, "[gns] CancelDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) | ||
338 | |||
339 | // assemble DHT request | ||
340 | req := message.NewDHTClientGetStopMsg(query.Key) | ||
341 | req.Id = uint64(util.NextID()) | ||
342 | |||
343 | // get response from DHT service | ||
344 | var resp message.Message | ||
345 | if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil { | ||
346 | return | ||
347 | } | ||
348 | |||
349 | // handle message depending on its type | ||
350 | logger.Println(logger.DBG, "[gns] Handling response from DHT service") | ||
351 | switch m := resp.(type) { | ||
352 | case *message.DHTClientResultMsg: | ||
353 | // check for matching IDs | ||
354 | if m.Id != req.Id { | ||
355 | logger.Println(logger.ERROR, "[gns] Got response for unknown ID") | ||
356 | break | ||
357 | } | ||
358 | } | ||
359 | return | ||
360 | } | ||
diff --git a/src/gnunet/transport/channel.go b/src/gnunet/transport/channel.go index 4ba49ac..458a063 100644 --- a/src/gnunet/transport/channel.go +++ b/src/gnunet/transport/channel.go | |||
@@ -38,7 +38,6 @@ var ( | |||
38 | ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented") | 38 | ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented") |
39 | ErrChannelNotOpened = fmt.Errorf("Channel not opened") | 39 | ErrChannelNotOpened = fmt.Errorf("Channel not opened") |
40 | ErrChannelInterrupted = fmt.Errorf("Channel interrupted") | 40 | ErrChannelInterrupted = fmt.Errorf("Channel interrupted") |
41 | ErrChannelClosed = fmt.Errorf("Channel closed") | ||
42 | ) | 41 | ) |
43 | 42 | ||
44 | //////////////////////////////////////////////////////////////////////// | 43 | //////////////////////////////////////////////////////////////////////// |
@@ -145,12 +144,6 @@ func (c *MsgChannel) Close() error { | |||
145 | 144 | ||
146 | // Send a GNUnet message over a channel. | 145 | // Send a GNUnet message over a channel. |
147 | func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) error { | 146 | func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) error { |
148 | |||
149 | // check for closed channel | ||
150 | if !c.ch.IsOpen() { | ||
151 | return ErrChannelClosed | ||
152 | } | ||
153 | |||
154 | // convert message to binary data | 147 | // convert message to binary data |
155 | data, err := data.Marshal(msg) | 148 | data, err := data.Marshal(msg) |
156 | if err != nil { | 149 | if err != nil { |
@@ -181,11 +174,6 @@ func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) error | |||
181 | 174 | ||
182 | // Receive GNUnet messages over a plain Channel. | 175 | // Receive GNUnet messages over a plain Channel. |
183 | func (c *MsgChannel) Receive(sig *concurrent.Signaller) (message.Message, error) { | 176 | func (c *MsgChannel) Receive(sig *concurrent.Signaller) (message.Message, error) { |
184 | // check for closed channel | ||
185 | if !c.ch.IsOpen() { | ||
186 | return nil, ErrChannelClosed | ||
187 | } | ||
188 | |||
189 | // get bytes from channel | 177 | // get bytes from channel |
190 | get := func(pos, count int) error { | 178 | get := func(pos, count int) error { |
191 | n, err := c.ch.Read(c.buf[pos:pos+count], sig) | 179 | n, err := c.ch.Read(c.buf[pos:pos+count], sig) |
diff --git a/src/gnunet/transport/channel_netw.go b/src/gnunet/transport/channel_netw.go index 4f0ba4a..b70faa4 100644 --- a/src/gnunet/transport/channel_netw.go +++ b/src/gnunet/transport/channel_netw.go | |||
@@ -121,8 +121,6 @@ func (c *NetworkChannel) Read(buf []byte, sig *concurrent.Signaller) (int, error | |||
121 | switch val := x.(type) { | 121 | switch val := x.(type) { |
122 | case bool: | 122 | case bool: |
123 | if val { | 123 | if val { |
124 | c.conn.Close() | ||
125 | c.conn = nil | ||
126 | return 0, ErrChannelInterrupted | 124 | return 0, ErrChannelInterrupted |
127 | } | 125 | } |
128 | } | 126 | } |
@@ -156,7 +154,6 @@ func (c *NetworkChannel) Write(buf []byte, sig *concurrent.Signaller) (int, erro | |||
156 | switch val := x.(type) { | 154 | switch val := x.(type) { |
157 | case bool: | 155 | case bool: |
158 | if val { | 156 | if val { |
159 | c.conn.Close() | ||
160 | return 0, ErrChannelInterrupted | 157 | return 0, ErrChannelInterrupted |
161 | } | 158 | } |
162 | } | 159 | } |