gnunet-go

GNUnet Bindings for Go
Log | Files | Refs | README | LICENSE

commit db7da66be57c6a9df87f3ea1f3cd681539ad9b51
parent 2a2a558aa553aca3394513114d13d4888ae822db
Author: Bernd Fix <brf@hoi-polloi.org>
Date:   Wed,  1 Apr 2020 12:23:06 +0200

Fixed DHT cancellation sequence and minor improvements.

Diffstat:
Msrc/gnunet/modules.go | 1-
Msrc/gnunet/service/client.go | 2+-
Msrc/gnunet/service/context.go | 21+++++++++++++++------
Msrc/gnunet/service/dht/module.go | 4----
Msrc/gnunet/service/gns/module.go | 12+-----------
Msrc/gnunet/service/gns/service.go | 136++++++++++++++++++++++++++++++++++++++++++-------------------------------------
Msrc/gnunet/transport/channel.go | 12------------
Msrc/gnunet/transport/channel_netw.go | 3---
8 files changed, 90 insertions(+), 101 deletions(-)

diff --git a/src/gnunet/modules.go b/src/gnunet/modules.go @@ -60,6 +60,5 @@ func init() { LookupLocal: Modules.Namecache.Get, StoreLocal: Modules.Namecache.Put, LookupRemote: Modules.DHT.Get, - CancelRemote: Modules.DHT.Cancel, } } diff --git a/src/gnunet/service/client.go b/src/gnunet/service/client.go @@ -69,7 +69,7 @@ func ServiceRequestResponse( req message.Message) (message.Message, error) { // client-connect to the service - logger.Printf(logger.DBG, "[%s] Connect to %s service\n", caller, callee) + logger.Printf(logger.DBG, "[%s] Connecting to %s service...\n", caller, callee) cl, err := NewClient(endp) if err != nil { return nil, err diff --git a/src/gnunet/service/context.go b/src/gnunet/service/context.go @@ -30,17 +30,19 @@ import ( // by a service; the session is handled by the 'ServeClient' method of a // service implementation. type SessionContext struct { - Id int // session identifier - wg *sync.WaitGroup // wait group for the session - sig *concurrent.Signaller // signaller for the session + Id int // session identifier + wg *sync.WaitGroup // wait group for the session + sig *concurrent.Signaller // signaller for the session + pending int // number of pending go-routines } // NewSessionContext instantiates a new session context. func NewSessionContext() *SessionContext { return &SessionContext{ - Id: util.NextID(), - wg: new(sync.WaitGroup), - sig: concurrent.NewSignaller(), + Id: util.NextID(), + wg: new(sync.WaitGroup), + sig: concurrent.NewSignaller(), + pending: 0, } } @@ -55,11 +57,18 @@ func (ctx *SessionContext) Cancel() { // Add a go-routine to the wait group. func (ctx *SessionContext) Add() { ctx.wg.Add(1) + ctx.pending++ } // Remove a go-routine from the wait group. func (ctx *SessionContext) Remove() { ctx.wg.Done() + ctx.pending-- +} + +// Waiting returns the number of waiting go-routines. +func (ctx *SessionContext) Waiting() int { + return ctx.pending } // Signaller returns the working instance for the context. diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go @@ -40,10 +40,6 @@ func (nc *DHTModule) Get(ctx *service.SessionContext, query *gns.Query) (*messag return nil, nil } -func (nc *DHTModule) Cancel(ctx *service.SessionContext, query *gns.Query) error { - return nil -} - func (nc *DHTModule) Put(ctx *service.SessionContext, block *message.GNSBlock) error { return nil } diff --git a/src/gnunet/service/gns/module.go b/src/gnunet/service/gns/module.go @@ -27,7 +27,6 @@ import ( "gnunet/enums" "gnunet/message" "gnunet/service" - "gnunet/transport" "gnunet/util" "github.com/bfix/gospel/crypto/ed25519" @@ -115,7 +114,6 @@ type GNSModule struct { LookupLocal func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error) StoreLocal func(ctx *service.SessionContext, block *message.GNSBlock) error LookupRemote func(ctx *service.SessionContext, query *Query) (*message.GNSBlock, error) - CancelRemote func(ctx *service.SessionContext, query *Query) error } // Resolve a GNS name with multiple labels. If pkey is not nil, the name @@ -404,15 +402,7 @@ func (gns *GNSModule) Lookup( // get the block from a remote lookup if block, err = gns.LookupRemote(ctx, query); err != nil || block == nil { if err != nil { - // check for aborted remote lookup: we need to cancel the query - if err == transport.ErrChannelInterrupted { - logger.Println(logger.WARN, "[gns] remote Lookup aborted -- cleaning up.") - if err = gns.CancelRemote(ctx, query); err != nil { - logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", err.Error()) - } - } else { - logger.Printf(logger.ERROR, "[gns] remote Lookup failed: %s\n", err.Error()) - } + logger.Printf(logger.ERROR, "[gns] remote Lookup failed: %s\n", err.Error()) block = nil } else { logger.Println(logger.DBG, "[gns] remote Lookup: no block found") diff --git a/src/gnunet/service/gns/service.go b/src/gnunet/service/gns/service.go @@ -59,7 +59,6 @@ func NewGNSService() service.Service { inst.LookupLocal = inst.LookupNamecache inst.StoreLocal = inst.StoreNamecache inst.LookupRemote = inst.LookupDHT - inst.CancelRemote = inst.CancelDHT return inst } @@ -76,45 +75,46 @@ func (s *GNSService) Stop() error { // Serve a client channel. func (s *GNSService) ServeClient(ctx *service.SessionContext, mc *transport.MsgChannel) { + reqId := 0 loop: for { // receive next message from client - logger.Printf(logger.DBG, "[gns] Waiting for message in session '%d'...\n", ctx.Id) + reqId++ + logger.Printf(logger.DBG, "[gns:%d:%d] Waiting for client request...\n", ctx.Id, reqId) msg, err := mc.Receive(ctx.Signaller()) if err != nil { if err == io.EOF { - logger.Println(logger.INFO, "[gns] Client channel closed.") + logger.Printf(logger.INFO, "[gns:%d:%d] Client channel closed.\n", ctx.Id, reqId) } else if err == transport.ErrChannelInterrupted { - logger.Println(logger.INFO, "[gns] Service operation interrupted.") + logger.Printf(logger.INFO, "[gns:%d:%d] Service operation interrupted.\n", ctx.Id, reqId) } else { - logger.Printf(logger.ERROR, "[gns] Message-receive failed: %s\n", err.Error()) + logger.Printf(logger.ERROR, "[gns:%d:%d] Message-receive failed: %s\n", ctx.Id, reqId, err.Error()) } break loop } - logger.Printf(logger.INFO, "[gns] Received msg: %v\n", msg) + logger.Printf(logger.INFO, "[gns:%d:%d] Received request: %v\n", ctx.Id, reqId, msg) // perform lookup - var resp message.Message switch m := msg.(type) { case *message.GNSLookupMsg: //---------------------------------------------------------- // GNS_LOOKUP //---------------------------------------------------------- - logger.Println(logger.INFO, "[gns] Lookup request received.") - respX := message.NewGNSLookupResultMsg(m.Id) - resp = respX // perform lookup on block (locally and remote) - go func() { + go func(id int, m *message.GNSLookupMsg) { + logger.Printf(logger.INFO, "[gns:%d:%d] Lookup request received.\n", ctx.Id, id) + resp := message.NewGNSLookupResultMsg(m.Id) ctx.Add() defer func() { // send response if resp != nil { if err := mc.Send(resp, ctx.Signaller()); err != nil { - logger.Printf(logger.ERROR, "[gns] Failed to send response: %s\n", err.Error()) + logger.Printf(logger.ERROR, "[gns:%d:%d] Failed to send response: %s\n", ctx.Id, id, err.Error()) } } // go-routine finished + logger.Printf(logger.DBG, "[gns:%d:%d] Lookup request finished.\n", ctx.Id, id) ctx.Remove() }() @@ -123,7 +123,7 @@ loop: kind := NewRRTypeList(int(m.Type)) recset, err := s.Resolve(ctx, label, pkey, kind, int(m.Options), 0) if err != nil { - logger.Printf(logger.ERROR, "[gns] Failed to lookup block: %s\n", err.Error()) + logger.Printf(logger.ERROR, "[gns:%d:%d] Failed to lookup block: %s\n", ctx.Id, id, err.Error()) if err == transport.ErrChannelInterrupted { resp = nil } @@ -131,40 +131,40 @@ loop: } // handle records if recset != nil { - logger.Printf(logger.DBG, "[gns] Received record set with %d entries\n", recset.Count) + logger.Printf(logger.DBG, "[gns:%d:%d] Received record set with %d entries\n", ctx.Id, id, recset.Count) // get records from block if recset.Count == 0 { - logger.Println(logger.WARN, "[gns] No records in block") + logger.Printf(logger.WARN, "[gns:%d:%d] No records in block\n", ctx.Id, id) return } // process records for i, rec := range recset.Records { - logger.Printf(logger.DBG, "[gns] Record #%d: %v\n", i, rec) + logger.Printf(logger.DBG, "[gns:%d:%d] Record #%d: %v\n", ctx.Id, id, i, rec) // is this the record type we are looking for? if rec.Type == m.Type || int(m.Type) == enums.GNS_TYPE_ANY { // add it to the response message - respX.AddRecord(rec) + resp.AddRecord(rec) } } } - }() + }(reqId, m) default: //---------------------------------------------------------- // UNKNOWN message type received //---------------------------------------------------------- - logger.Printf(logger.ERROR, "[gns] Unhandled message of type (%d)\n", msg.Header().MsgType) + logger.Printf(logger.ERROR, "[gns:%d:%d] Unhandled message of type (%d)\n", ctx.Id, reqId, msg.Header().MsgType) break loop } } - // cancel all tasks running for this session/connection - logger.Printf(logger.INFO, "[gns] Start closing session '%d'...\n", ctx.Id) - ctx.Cancel() - // close client connection mc.Close() + + // cancel all tasks running for this session/connection + logger.Printf(logger.INFO, "[gns:%d] Start closing session... [%d]\n", ctx.Id, ctx.Waiting()) + ctx.Cancel() } // LookupNamecache @@ -269,27 +269,64 @@ func (s *GNSService) StoreNamecache(ctx *service.SessionContext, block *message. // LookupDHT func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) (block *message.GNSBlock, err error) { logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) - - // assemble DHT request - req := message.NewDHTClientGetMsg(query.Key) - req.Id = uint64(util.NextID()) - req.ReplLevel = uint32(enums.DHT_GNS_REPLICATION_LEVEL) - req.Type = uint32(enums.BLOCK_TYPE_GNS_NAMERECORD) - req.Options = uint32(enums.DHT_RO_DEMULTIPLEX_EVERYWHERE) block = nil - // get response from DHT service - var resp message.Message - if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil { - return + // client-connect to the DHT service + logger.Println(logger.DBG, "[gns] Connecting to DHT service...") + cl, err := service.NewClient(config.Cfg.DHT.Endpoint) + if err != nil { + return nil, err + } + defer func() { + logger.Println(logger.DBG, "[gns] Closing connection to DHT service") + cl.Close() + }() + + var ( + // response received from service + resp message.Message + + // request-response interaction with service + interact = func(req message.Message, withResponse bool) (err error) { + // send request + logger.Println(logger.DBG, "[gns] Sending request to DHT service") + if err = cl.SendRequest(ctx, req); err == nil && withResponse { + // wait for a single response + logger.Println(logger.DBG, "[gns] Waiting for response from DHT service") + resp, err = cl.ReceiveResponse(ctx) + } + return + } + ) + + // send DHT GET request and wait for response + reqGet := message.NewDHTClientGetMsg(query.Key) + reqGet.Id = uint64(util.NextID()) + reqGet.ReplLevel = uint32(enums.DHT_GNS_REPLICATION_LEVEL) + reqGet.Type = uint32(enums.BLOCK_TYPE_GNS_NAMERECORD) + reqGet.Options = uint32(enums.DHT_RO_DEMULTIPLEX_EVERYWHERE) + + if err = interact(reqGet, true); err != nil { + // check for aborted remote lookup: we need to cancel the query + if err == transport.ErrChannelInterrupted { + logger.Println(logger.WARN, "[gns] remote Lookup aborted -- cleaning up.") + + // send DHT GET_STOP request and terminate + reqStop := message.NewDHTClientGetStopMsg(query.Key) + reqStop.Id = reqGet.Id + if err = interact(reqStop, false); err != nil { + logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", err.Error()) + } + return nil, transport.ErrChannelInterrupted + } } - // handle message depending on its type + // handle response message depending on its type logger.Println(logger.DBG, "[gns] Handling response from DHT service") switch m := resp.(type) { case *message.DHTClientResultMsg: // check for matching IDs - if m.Id != req.Id { + if m.Id != reqGet.Id { logger.Println(logger.ERROR, "[gns] Got response for unknown ID") break } @@ -331,30 +368,3 @@ func (s *GNSService) LookupDHT(ctx *service.SessionContext, query *Query) (block } return } - -// CancelDHT -func (s *GNSService) CancelDHT(ctx *service.SessionContext, query *Query) (err error) { - logger.Printf(logger.DBG, "[gns] CancelDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) - - // assemble DHT request - req := message.NewDHTClientGetStopMsg(query.Key) - req.Id = uint64(util.NextID()) - - // get response from DHT service - var resp message.Message - if resp, err = service.ServiceRequestResponse(ctx, "gns", "DHT", config.Cfg.DHT.Endpoint, req); err != nil { - return - } - - // handle message depending on its type - logger.Println(logger.DBG, "[gns] Handling response from DHT service") - switch m := resp.(type) { - case *message.DHTClientResultMsg: - // check for matching IDs - if m.Id != req.Id { - logger.Println(logger.ERROR, "[gns] Got response for unknown ID") - break - } - } - return -} diff --git a/src/gnunet/transport/channel.go b/src/gnunet/transport/channel.go @@ -38,7 +38,6 @@ var ( ErrChannelNotImplemented = fmt.Errorf("Protocol not implemented") ErrChannelNotOpened = fmt.Errorf("Channel not opened") ErrChannelInterrupted = fmt.Errorf("Channel interrupted") - ErrChannelClosed = fmt.Errorf("Channel closed") ) //////////////////////////////////////////////////////////////////////// @@ -145,12 +144,6 @@ func (c *MsgChannel) Close() error { // Send a GNUnet message over a channel. func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) error { - - // check for closed channel - if !c.ch.IsOpen() { - return ErrChannelClosed - } - // convert message to binary data data, err := data.Marshal(msg) if err != nil { @@ -181,11 +174,6 @@ func (c *MsgChannel) Send(msg message.Message, sig *concurrent.Signaller) error // Receive GNUnet messages over a plain Channel. func (c *MsgChannel) Receive(sig *concurrent.Signaller) (message.Message, error) { - // check for closed channel - if !c.ch.IsOpen() { - return nil, ErrChannelClosed - } - // get bytes from channel get := func(pos, count int) error { n, err := c.ch.Read(c.buf[pos:pos+count], sig) diff --git a/src/gnunet/transport/channel_netw.go b/src/gnunet/transport/channel_netw.go @@ -121,8 +121,6 @@ func (c *NetworkChannel) Read(buf []byte, sig *concurrent.Signaller) (int, error switch val := x.(type) { case bool: if val { - c.conn.Close() - c.conn = nil return 0, ErrChannelInterrupted } } @@ -156,7 +154,6 @@ func (c *NetworkChannel) Write(buf []byte, sig *concurrent.Signaller) (int, erro switch val := x.(type) { case bool: if val { - c.conn.Close() return 0, ErrChannelInterrupted } }