gnunet-go

GNUnet Bindings for Go
Log | Files | Refs | README | LICENSE

commit 8f8feaf176e62f14a6d449c0a2fb6f0ca76b22b8
parent 22787606b45af3c36aebc4eb48c353f61aef9349
Author: Bernd Fix <brf@hoi-polloi.org>
Date:   Sat, 11 Jun 2022 15:45:45 +0200

RC2 for Milestone 1 (NGI Assure)

Diffstat:
Msrc/gnunet/cmd/gnunet-service-dht-go/main.go | 31++++++++++++++++++++++++++++++-
Msrc/gnunet/core/core.go | 156+++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------
Msrc/gnunet/core/core_test.go | 2+-
Msrc/gnunet/go.mod | 5++---
Msrc/gnunet/go.sum | 7++++---
Msrc/gnunet/message/msg_hello.go | 9++++++++-
Msrc/gnunet/message/msg_hello_dht.go | 75++++++++++++++++++++++++++++++++++++++++++++++++---------------------------
Msrc/gnunet/service/dht/module.go | 15+++++++++++++++
Msrc/gnunet/service/dht/service.go | 2+-
Msrc/gnunet/service/rpc.go | 13+++++++------
Msrc/gnunet/test/gnunet-dhtu/main.go | 80++++++++++++++++++++++++++++++++++---------------------------------------------
Msrc/gnunet/transport/endpoint.go | 64+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Msrc/gnunet/transport/reader_writer.go | 23+++++++++++++++++++++++
Asrc/gnunet/util/address_test.go | 54++++++++++++++++++++++++++++++++++++++++++++++++++++++
14 files changed, 393 insertions(+), 143 deletions(-)

diff --git a/src/gnunet/cmd/gnunet-service-dht-go/main.go b/src/gnunet/cmd/gnunet-service-dht-go/main.go @@ -32,6 +32,8 @@ import ( "gnunet/core" "gnunet/service" "gnunet/service/dht" + "gnunet/service/dht/blocks" + "gnunet/util" "github.com/bfix/gospel/logger" ) @@ -72,7 +74,7 @@ func main() { socket = config.Cfg.GNS.Service.Socket } params := make(map[string]string) - if len(param) == 0 { + if len(param) > 0 { for _, p := range strings.Split(param, ",") { kv := strings.SplitN(p, "=", 2) params[kv[0]] = kv[1] @@ -121,6 +123,33 @@ func main() { dhtSrv.InitRPC(rpc) } + // handle bootstrap: collect known addresses + bsList := make([]*util.Address, 0) + for _, bs := range config.Cfg.Bootstrap.Nodes { + // check for HELLO URL + if strings.HasPrefix(bs, "gnunet://hello/") { + var hb *blocks.HelloBlock + if hb, err = blocks.ParseHelloURL(bs, true); err != nil { + logger.Printf(logger.ERROR, "[dht] failed bootstrap HELLO URL %s: %s", bs, err.Error()) + continue + } + // append HELLO addresses + bsList = append(bsList, hb.Addresses()...) + } else { + // parse address directly + var addr *util.Address + if addr, err = util.ParseAddress(bs); err != nil { + logger.Printf(logger.ERROR, "[dht] failed bootstrap address %s: %s", bs, err.Error()) + continue + } + bsList = append(bsList, addr) + } + } + // send HELLO to all bootstrap addresses + for _, addr := range bsList { + c.SendHello(ctx, addr) + } + // handle OS signals sigCh := make(chan os.Signal, 5) signal.Notify(sigCh) diff --git a/src/gnunet/core/core.go b/src/gnunet/core/core.go @@ -38,6 +38,7 @@ import ( var ( ErrCoreNoUpnpDyn = errors.New("no dynamic port with UPnP") ErrCoreNoEndpAddr = errors.New("no endpoint for address") + ErrCoreNotSent = errors.New("message not sent") ) //---------------------------------------------------------------------- @@ -69,6 +70,9 @@ type Core struct { // List of registered endpoints endpoints map[string]*EndpointRef + + // last HELLO message used; re-create if expired + lastHello *message.HelloDHTMsg } //---------------------------------------------------------------------- @@ -159,12 +163,13 @@ func (c *Core) pump(ctx context.Context) { select { // get (next) message from transport case tm := <-c.incoming: - var ev *Event + logger.Printf(logger.DBG, "[core] Message received from %s: %s", tm.Peer, transport.Dump(tm.Msg, "json")) // inspect message for peer state events + var ev *Event switch msg := tm.Msg.(type) { case *message.HelloDHTMsg: - logger.Println(logger.INFO, "[core] Received HELLO message: "+msg.String()) + // verify integrity of message if ok, err := msg.Verify(tm.Peer); !ok || err != nil { logger.Println(logger.WARN, "[core] Received invalid DHT_P2P_HELLO message") @@ -176,9 +181,11 @@ func (c *Core) pump(ctx context.Context) { logger.Println(logger.WARN, "[core] Failed to parse addresses from DHT_P2P_HELLO message") break } - for _, addr := range aList { - c.Learn(ctx, tm.Peer, addr.Wrap()) + if err := c.Learn(ctx, tm.Peer, aList); err != nil { + logger.Println(logger.WARN, "[core] Failed to learn addresses from DHT_P2P_HELLO message: "+err.Error()) + break } + // generate EV_CONNECT event ev = &Event{ ID: EV_CONNECT, @@ -222,21 +229,34 @@ func (c *Core) Shutdown() { // Send is a function that allows the local peer to send a protocol // message to a remote peer. -func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg message.Message) error { - // get peer label (id or "@") - label := "@" - if peer != nil { - label = peer.String() - } +func (c *Core) Send(ctx context.Context, peer *util.PeerID, msg message.Message) (err error) { // TODO: select best endpoint protocol for transport; now fixed to IP+UDP netw := "ip+udp" - addrs := c.peers.Get(label, netw) - if len(addrs) == 0 { - return ErrCoreNoEndpAddr + + // try all addresses for peer + aList := c.peers.Get(peer.String(), netw) + maybe := false // message may be sent... + for _, addr := range aList { + logger.Printf(logger.WARN, "[core] Trying to send to %s", addr.URI()) + // send message to address + if err = c.send(ctx, addr, msg); err != nil { + // if it is possible that the message was not sent, try next address + if err != transport.ErrEndpMaybeSent { + logger.Printf(logger.WARN, "[core] Failed to send to %s: %s", addr.URI(), err.Error()) + } else { + maybe = true + } + continue + } + // one successful send is enough + return } - // TODO: select best address; curently selects first - addr := addrs[0] - return c.send(ctx, addr, msg) + if maybe { + err = transport.ErrEndpMaybeSent + } else { + err = ErrCoreNotSent + } + return } // send message directly to address @@ -247,40 +267,82 @@ func (c *Core) send(ctx context.Context, addr *util.Address, msg message.Message return c.trans.Send(ctx, addr, tm) } -// Learn a (new) address for peer -func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) (err error) { - // assemble our own HELLO message: - addrList := make([]*util.Address, 0) - for _, epRef := range c.endpoints { - addrList = append(addrList, epRef.addr) - } - node := c.local - var hello *blocks.HelloBlock - hello, err = node.HelloData(time.Hour, addrList) - if err != nil { - return +// Learn (new) addresses for peer +func (c *Core) Learn(ctx context.Context, peer *util.PeerID, addrs []*util.Address) (err error) { + // learn all addresses for peer + newPeer := false + for _, addr := range addrs { + logger.Printf(logger.INFO, "[core] Learning %s for %s (expires %s)", addr.URI(), peer, addr.Expires) + newPeer = (c.peers.Add(peer.String(), addr) == 1) || newPeer } - msg := message.NewHelloDHTMsg() - var aList []*message.HelloAddress - msg.NumAddr = uint16(len(hello.Addresses())) - for _, a := range hello.Addresses() { - ha := message.NewHelloAddress(a) - aList = append(aList, ha) - } - msg.SetAddresses(aList) - - // if no peer is given, we send HELLO directly to address - if peer == nil { - return c.send(ctx, addr, msg) - } - // add peer address to address list - if c.peers.Add(peer.String(), addr) == 1 { + // new peer detected? + if newPeer { // we added a previously unknown peer: send a HELLO + var msg *message.HelloDHTMsg + if msg, err = c.getHello(); err != nil { + return + } + logger.Printf(logger.INFO, "[core] Sending HELLO to %s: %s", peer, msg) err = c.Send(ctx, peer, msg) + // no error if the message might have been sent + if err == transport.ErrEndpMaybeSent { + err = nil + } } return } +// Send the currently active HELLO to given network address +func (c *Core) SendHello(ctx context.Context, addr *util.Address) (err error) { + // get (buffered) HELLO + var msg *message.HelloDHTMsg + if msg, err = c.getHello(); err != nil { + return + } + logger.Printf(logger.INFO, "[core] Sending HELLO to %s: %s", addr.URI(), msg) + return c.send(ctx, addr, msg) +} + +// get the recent HELLO if it is defined and not expired; +// create a new HELLO otherwise. +func (c *Core) getHello() (msg *message.HelloDHTMsg, err error) { + if c.lastHello == nil || c.lastHello.Expires.Expired() { + // assemble new HELLO message + addrList := make([]*util.Address, 0) + for _, epRef := range c.endpoints { + addrList = append(addrList, epRef.addr) + } + node := c.local + var hello *blocks.HelloBlock + hello, err = node.HelloData(time.Hour, addrList) + if err != nil { + return + } + msg = message.NewHelloDHTMsg() + msg.NumAddr = uint16(len(hello.Addresses())) + msg.SetAddresses(hello.Addresses()) + if err = msg.Sign(c.local.prv); err != nil { + return + } + // save for later use + c.lastHello = msg + + // DEBUG + var ok bool + if ok, err = msg.Verify(c.PeerID()); !ok || err != nil { + if !ok { + err = errors.New("[core] failed to verify own HELLO") + } + logger.Println(logger.ERROR, err.Error()) + return + } + logger.Println(logger.DBG, "[core] New HELLO: "+transport.Dump(msg, "json")) + return + } + // we have a valid HELLO for re-use. + return c.lastHello, nil +} + // Addresses returns the list of listening endpoint addresses func (c *Core) Addresses() (list []*util.Address, err error) { for _, epRef := range c.endpoints { @@ -308,14 +370,6 @@ func (c *Core) PeerID() *util.PeerID { // When the connection attempt is successful, information on the new // peer is offered through the PEER_CONNECTED signal. func (c *Core) TryConnect(peer *util.PeerID, addr net.Addr) error { - // select endpoint for address - if ep := c.findEndpoint(peer, addr); ep == nil { - return transport.ErrTransNoEndpoint - } - return nil -} - -func (c *Core) findEndpoint(peer *util.PeerID, addr net.Addr) transport.Endpoint { return nil } diff --git a/src/gnunet/core/core_test.go b/src/gnunet/core/core_test.go @@ -247,7 +247,7 @@ func (n *TestNode) Learn(ctx context.Context, peer *util.PeerID, addr *util.Addr label = peer.String() } n.t.Logf("[%d] Learning %s for %s", n.id, addr.StringAll(), label) - if err := n.core.Learn(ctx, peer, addr); err != nil { + if err := n.core.Learn(ctx, peer, []*util.Address{addr}); err != nil { n.t.Log("Learn: " + err.Error()) } } diff --git a/src/gnunet/go.mod b/src/gnunet/go.mod @@ -16,10 +16,9 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/huin/goupnp v1.0.0 // indirect - golang.org/x/mod v0.4.2 // indirect + golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 // indirect - golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + golang.org/x/tools v0.1.11 // indirect ) diff --git a/src/gnunet/go.sum b/src/gnunet/go.sum @@ -27,8 +27,9 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898 h1:SLP7Q4Di66FONjDJbCYrCRrh97focO6sLogHO7/g8F0= golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -59,11 +60,11 @@ golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2 h1:BonxutuHCTL0rBDnZlKjpGIQFTjyUVTexFOdWkB6Fg0= golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.11 h1:loJ25fNOEhSXfHrpoGj91eCUThwdNX6u24rO1xnNteY= +golang.org/x/tools v0.1.11/go.mod h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/src/gnunet/message/msg_hello.go b/src/gnunet/message/msg_hello.go @@ -24,6 +24,7 @@ import ( "fmt" "gnunet/util" "io" + "time" ) //---------------------------------------------------------------------- @@ -38,10 +39,16 @@ type HelloAddress struct { // NewHelloAddress create a new HELLO address from the given address func NewHelloAddress(a *util.Address) *HelloAddress { + // use default expiration time, but adjust it if address expires earlier + exp := util.NewAbsoluteTime(time.Now().Add(HelloAddressExpiration)) + if exp.Compare(a.Expires) > 0 { + exp = a.Expires + } + // convert address addr := &HelloAddress{ transport: a.Netw, addrSize: uint16(len(a.Address)), - expires: a.Expires, + expires: exp, address: make([]byte, len(a.Address)), } copy(addr.address, a.Address) diff --git a/src/gnunet/message/msg_hello_dht.go b/src/gnunet/message/msg_hello_dht.go @@ -25,7 +25,6 @@ import ( "fmt" "gnunet/enums" "gnunet/util" - "io" "time" "github.com/bfix/gospel/crypto/ed25519" @@ -67,49 +66,59 @@ func NewHelloDHTMsg() *HelloDHTMsg { } // Addresses returns the list of HelloAddress -func (m *HelloDHTMsg) Addresses() (list []*HelloAddress, err error) { - rdr := bytes.NewReader(m.AddrList) - var addr *HelloAddress - num := 0 +func (m *HelloDHTMsg) Addresses() (list []*util.Address, err error) { + var addr *util.Address + var as string + num, pos := 0, 0 for { - // parse address from stream - if addr, err = ParseHelloAddr(rdr); err != nil { - // end of stream: no more addresses - if err == io.EOF { - err = nil - } - // check numbers - if num != int(m.NumAddr) { - logger.Printf(logger.WARN, "[HelloDHTMsg] Number of addresses doesn't match (got %d, expected %d)", num, m.NumAddr) - } + // parse address string from stream + if as, pos = util.ReadCString(m.AddrList, pos); pos == -1 { + break + } + if addr, err = util.ParseAddress(as); err != nil { return } + addr.Expires = m.Expires list = append(list, addr) num++ } -} - -// String returns a human-readable representation of the message. -func (m *HelloDHTMsg) String() string { - return fmt.Sprintf("HelloDHTMsg{expire:%s,addrs=%d}", m.Expires, m.NumAddr) + // check numbers + if num != int(m.NumAddr) { + logger.Printf(logger.WARN, "[HelloDHTMsg] Number of addresses does not match (got %d, expected %d)", num, m.NumAddr) + } + return } // SetAddresses adds addresses to the HELLO message. -func (m *HelloDHTMsg) SetAddresses(list []*HelloAddress) { +func (m *HelloDHTMsg) SetAddresses(list []*util.Address) { // write addresses as blob and track earliest expiration - exp := util.AbsoluteTimeNever() + exp := util.NewAbsoluteTime(time.Now().Add(HelloAddressExpiration)) wrt := new(bytes.Buffer) for _, addr := range list { // check if address expires before current expire - if _, after := exp.Diff(addr.expires); !after { - exp = addr.expires + if exp.Compare(addr.Expires) > 0 { + exp = addr.Expires } - n, _ := wrt.Write(addr.Bytes()) - m.MsgSize += uint16(n) - m.NumAddr++ + n, _ := wrt.Write([]byte(addr.URI())) + wrt.WriteByte(0) + m.MsgSize += uint16(n + 1) } m.AddrList = wrt.Bytes() m.Expires = exp + m.NumAddr = uint16(len(list)) +} + +// String returns a human-readable representation of the message. +func (m *HelloDHTMsg) String() string { + addrs, _ := m.Addresses() + aList := "" + for i, a := range addrs { + if i > 0 { + aList += "," + } + aList += a.URI() + } + return fmt.Sprintf("HelloDHTMsg{expire:%s,addrs=%d:[%s]}", m.Expires, m.NumAddr, aList) } // Header returns the message header in a separate instance. @@ -129,6 +138,18 @@ func (m *HelloDHTMsg) Verify(peer *util.PeerID) (bool, error) { return pub.EdVerify(sd, sig) } +// Sign the HELLO data with private key +func (m *HelloDHTMsg) Sign(prv *ed25519.PrivateKey) error { + // assemble signed data + sd := m.signedData() + sig, err := prv.EdSign(sd) + if err != nil { + return err + } + m.Signature = sig.Bytes() + return nil +} + // signedData assembles a data block for sign and verify operations. func (m *HelloDHTMsg) signedData() []byte { // hash address block diff --git a/src/gnunet/service/dht/module.go b/src/gnunet/service/dht/module.go @@ -26,6 +26,8 @@ import ( "gnunet/service" "gnunet/service/dht/blocks" "time" + + "github.com/bfix/gospel/logger" ) //====================================================================== @@ -107,13 +109,23 @@ func (m *Module) Put(ctx context.Context, key blocks.Query, block blocks.Block) // Filter returns the event filter for the module func (m *Module) Filter() *core.EventFilter { f := core.NewEventFilter() + // events we are interested in f.AddEvent(core.EV_CONNECT) f.AddEvent(core.EV_DISCONNECT) + + // messages we are interested in: + // (1) DHT messages f.AddMsgType(message.DHT_CLIENT_GET) f.AddMsgType(message.DHT_CLIENT_GET_RESULTS_KNOWN) f.AddMsgType(message.DHT_CLIENT_GET_STOP) f.AddMsgType(message.DHT_CLIENT_PUT) f.AddMsgType(message.DHT_CLIENT_RESULT) + // (2) DHT_P2P messages + f.AddMsgType(message.DHT_P2P_PUT) + f.AddMsgType(message.DHT_P2P_GET) + f.AddMsgType(message.DHT_P2P_RESULT) + f.AddMsgType(message.DHT_P2P_HELLO) + return f } @@ -123,15 +135,18 @@ func (m *Module) event(ctx context.Context, ev *core.Event) { // New peer connected: case core.EV_CONNECT: // Add peer to routing table + logger.Printf(logger.INFO, "[dht] Peer %s connected", ev.Peer) m.rtable.Add(NewPeerAddress(ev.Peer)) // Peer disconnected: case core.EV_DISCONNECT: // Remove peer from routing table + logger.Printf(logger.INFO, "[dht] Peer %s disconnected", ev.Peer) m.rtable.Remove(NewPeerAddress(ev.Peer)) // Message received. case core.EV_MESSAGE: + logger.Printf(logger.INFO, "[dht] Message received: %s", ev.Msg.String()) // process message (if applicable) if m.ProcessFcn != nil { m.ProcessFcn(ctx, ev.Msg, ev.Resp) diff --git a/src/gnunet/service/dht/service.go b/src/gnunet/service/dht/service.go @@ -134,7 +134,7 @@ func (s *Service) HandleMessage(ctx context.Context, msg message.Message, back t //---------------------------------------------------------- // UNKNOWN message type received //---------------------------------------------------------- - logger.Printf(logger.ERROR, "[dht%s] Unhandled message of type (%d)\n", label, msg.Header().MsgType) + logger.Printf(logger.ERROR, "[dht-%s] Unhandled message of type (%d)\n", label, msg.Header().MsgType) return false } return true diff --git a/src/gnunet/service/rpc.go b/src/gnunet/service/rpc.go @@ -48,13 +48,14 @@ func StartRPC(ctx context.Context, endpoint string) (srvRPC *rpc.Server, err err WriteTimeout: 15 * time.Second, ReadTimeout: 15 * time.Second, } + // start listening + go func() { + if err := srv.ListenAndServe(); err != http.ErrServerClosed { + logger.Printf(logger.WARN, "[RPC] Server listen failed: %s", err.Error()) + } + }() + // wait for shutdown go func() { - // start listening - go func() { - if err := srv.ListenAndServe(); err != nil { - logger.Printf(logger.WARN, "[RPC] Server listen failed: %s", err.Error()) - } - }() select { case <-ctx.Done(): if err := srv.Shutdown(context.Background()); err != nil { diff --git a/src/gnunet/test/gnunet-dhtu/main.go b/src/gnunet/test/gnunet-dhtu/main.go @@ -20,13 +20,14 @@ package main import ( "context" - "encoding/hex" "flag" "fmt" "gnunet/config" "gnunet/core" + "gnunet/message" "gnunet/service" "gnunet/service/dht" + "gnunet/transport" "gnunet/util" "log" "net/rpc" @@ -37,41 +38,31 @@ import ( //---------------------------------------------------------------------- // Test Go node with DHTU GNUnet nodes +// +// N.B.: THIS TEST ONLY COVERS THE BASIC MESSAGE EXCHANGE LEVEL; NO +// MESSAGE PROCESSING EXCEPT FOR HELLO MESSAGES WILL TAKE PLACE. //---------------------------------------------------------------------- func main() { // handle command-line arguments - var ( - remoteId string - remoteAddr string - cfgFile string - ) + var remoteAddr string + var cfgFile string flag.StringVar(&cfgFile, "c", "gnunet-config.json", "configuration file") - flag.StringVar(&remoteId, "i", "", "peer id of remote node") flag.StringVar(&remoteAddr, "a", "", "address of remote node") flag.Parse() // read configuration file and set missing arguments. if err := config.ParseConfig(cfgFile); err != nil { - logger.Printf(logger.ERROR, "[gnunet-dhtu] Invalid configuration file: %s\n", err.Error()) + logger.Printf(logger.ERROR, "[node] Invalid configuration file: %s\n", err.Error()) return } // convert arguments - var ( - rId *util.PeerID - rAddr *util.Address - buf []byte - err error - ) + var rAddr *util.Address + var err error if rAddr, err = util.ParseAddress(remoteAddr); err != nil { - log.Fatal(err) - } - if len(remoteId) > 0 { - if buf, err = util.DecodeStringToBinary(remoteId, 32); err != nil { - log.Fatal(err) - } - rId = util.NewPeerID(buf) + logger.Println(logger.ERROR, err.Error()) + return } // setup execution context @@ -84,7 +75,8 @@ func main() { // create and run node node, err := NewTestNode(ctx) if err != nil { - log.Fatal(err) + logger.Println(logger.ERROR, err.Error()) + return } defer node.Shutdown() @@ -93,13 +85,17 @@ func main() { as := fmt.Sprintf("%s://%s:%d", ep.Network, ep.Address, ep.Port) listen, err := util.ParseAddress(as) if err != nil { - log.Fatal(err) + logger.Println(logger.ERROR, err.Error()) + return } aList := []*util.Address{listen} - logger.Println(logger.INFO, "HELLO: "+node.HelloURL(aList)) + logger.Println(logger.INFO, "[node] --> "+node.HelloURL(aList)) - // learn bootstrap address (triggers HELLO) - node.Learn(ctx, rId, rAddr) + // send HELLO to bootstrap address + if err = node.SendHello(ctx, rAddr); err != nil && err != transport.ErrEndpMaybeSent { + logger.Println(logger.ERROR, "[node] failed to send HELLO: "+err.Error()) + return + } // run forever var ch chan struct{} @@ -121,22 +117,15 @@ func (n *TestNode) Shutdown() { n.core.Shutdown() } func (n *TestNode) HelloURL(a []*util.Address) string { - hd, err := n.peer.HelloData(time.Hour, a) + hd, err := n.peer.HelloData(message.HelloAddressExpiration, a) if err != nil { return "" } return hd.URL() } -func (n *TestNode) Learn(ctx context.Context, peer *util.PeerID, addr *util.Address) { - label := "@" - if peer != nil { - label = peer.String() - } - log.Printf("[%d] Learning %s for %s", n.id, addr.StringAll(), label) - if err := n.core.Learn(ctx, peer, addr); err != nil { - log.Println("Learn: " + err.Error()) - } +func (n *TestNode) SendHello(ctx context.Context, addr *util.Address) error { + return n.core.SendHello(ctx, addr) } func NewTestNode(ctx context.Context) (node *TestNode, err error) { @@ -150,8 +139,7 @@ func NewTestNode(ctx context.Context) (node *TestNode, err error) { return } node.peer = node.core.Peer() - log.Printf("[%d] Node %s starting", node.id, node.peer.GetID()) - log.Printf("[%d] --> %s", node.id, hex.EncodeToString(node.peer.GetID().Key)) + logger.Printf(logger.INFO, "[node] Node %s starting", node.peer.GetID()) // start a new DHT service dht, err := dht.NewService(ctx, node.core) @@ -162,7 +150,7 @@ func NewTestNode(ctx context.Context) (node *TestNode, err error) { // start JSON-RPC server on request var rpc *rpc.Server if rpc, err = service.StartRPC(ctx, config.Cfg.RPC.Endpoint); err != nil { - logger.Printf(logger.ERROR, "[gnunet-dhtu] RPC failed to start: %s", err.Error()) + logger.Printf(logger.ERROR, "[node] RPC failed to start: %s", err.Error()) return } dht.InitRPC(rpc) @@ -177,7 +165,7 @@ func NewTestNode(ctx context.Context) (node *TestNode, err error) { if node.addr, err = util.ParseAddress(s); err != nil { continue } - log.Printf("[%d] Listening on %s", node.id, s) + logger.Printf(logger.INFO, "[node] Listening on %s", s) } // register as event listener @@ -195,22 +183,22 @@ func NewTestNode(ctx context.Context) (node *TestNode, err error) { case ev := <-incoming: switch ev.ID { case core.EV_CONNECT: - log.Printf("[%d] <<< Peer %s connected", node.id, ev.Peer) + logger.Printf(logger.INFO, "[node] <<< Peer %s connected", ev.Peer) case core.EV_DISCONNECT: - log.Printf("[%d] <<< Peer %s diconnected", node.id, ev.Peer) + logger.Printf(logger.INFO, "[node] <<< Peer %s diconnected", ev.Peer) case core.EV_MESSAGE: - log.Printf("[%d] <<< Msg from %s of type %d", node.id, ev.Peer, ev.Msg.Header().MsgType) - log.Printf("[%d] <<< --> %s", node.id, ev.Msg.String()) + logger.Printf(logger.INFO, "[node] <<< Msg from %s of type %d", ev.Peer, ev.Msg.Header().MsgType) + logger.Printf(logger.INFO, "[node] <<< --> %s", ev.Msg.String()) } // handle termination signal case <-ctx.Done(): - log.Printf("[%d] Shutting down node", node.id) + logger.Println(logger.INFO, "[node] Shutting down node") return // handle heart beat case now := <-tick.C: - log.Printf("[%d] Heart beat at %s", node.id, now.String()) + logger.Printf(logger.INFO, "[node] Heart beat at %s", now.String()) } } }() diff --git a/src/gnunet/transport/endpoint.go b/src/gnunet/transport/endpoint.go @@ -25,6 +25,11 @@ import ( "gnunet/message" "gnunet/util" "net" + "strings" + "sync" + "time" + + "github.com/bfix/gospel/logger" ) var ( @@ -34,6 +39,8 @@ var ( ErrEndpExists = errors.New("endpoint exists") ErrEndpNoAddress = errors.New("no address for endpoint") ErrEndpNoConnection = errors.New("no connection on endpoint") + ErrEndpMaybeSent = errors.New("message may have been sent - cant know") + ErrEndpWriteShort = errors.New("write too short") ) // Endpoint represents a local endpoint that can send and receive messages. @@ -78,9 +85,11 @@ func NewEndpoint(addr net.Addr) (ep Endpoint, err error) { // PacketEndpoint for packet-oriented network protocols type PaketEndpoint struct { id int // endpoint identifier + netw string // network identifier ("udp", "udp4", "udp6", ...) addr net.Addr // endpoint address conn net.PacketConn // packet connection buf []byte // buffer for read/write operations + mtx sync.Mutex // mutex for send operations } // Run packet endpoint: send incoming messages to the handler. @@ -94,9 +103,14 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) ( // use the actual listening address ep.addr = util.NewAddress(xproto, ep.conn.LocalAddr().String()) + // save more information to detect compatible send-to addresses + ep.netw = ep.conn.LocalAddr().Network() + // run watch dog for termination + active := true go func() { <-ctx.Done() + active = false ep.conn.Close() }() // run go routine to handle messages from clients @@ -105,6 +119,15 @@ func (ep *PaketEndpoint) Run(ctx context.Context, hdlr chan *TransportMessage) ( // read next message tm, err := ep.read() if err != nil { + // leave go routine if already dead + if !active { + return + } + logger.Println(logger.WARN, "[pkt_ep] read failed: "+err.Error()) + // gracefully ignore unknown message types + if strings.HasPrefix(err.Error(), "unknown message type") { + continue + } break } // label message @@ -154,19 +177,27 @@ func (ep *PaketEndpoint) read() (tm *TransportMessage, err error) { // Send message to address from endpoint func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg *TransportMessage) (err error) { + // only one sender at a time + ep.mtx.Lock() + defer ep.mtx.Unlock() + // check for valid connection if ep.conn == nil { return ErrEndpNoConnection } + // resolve target address var a *net.UDPAddr - a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()), addr.String()) + if a, err = net.ResolveUDPAddr(EpProtocol(addr.Network()), addr.String()); err != nil { + return + } // get message content (TransportMessage) var buf []byte if buf, err = msg.Bytes(); err != nil { return } + // handle extended protocol: switch ep.addr.Network() { case "ip+udp": @@ -176,8 +207,18 @@ func (ep *PaketEndpoint) Send(ctx context.Context, addr net.Addr, msg *Transport // unknown protocol return ErrEndpProtocolUnknown } - _, err = ep.conn.WriteTo(buf, a) - return + + // timeout after 1 second + if err = ep.conn.SetWriteDeadline(time.Now().Add(time.Second)); err != nil { + logger.Println(logger.DBG, "[pkt_ep] SetWriteDeadline failed: "+err.Error()) + return + } + var n int + n, err = ep.conn.WriteTo(buf, a) + if n != len(buf) { + err = ErrEndpWriteShort + } + return ErrEndpMaybeSent } // Address returms the @@ -188,6 +229,23 @@ func (ep *PaketEndpoint) Address() net.Addr { // CanSendTo returns true if the endpoint can sent to address func (ep *PaketEndpoint) CanSendTo(addr net.Addr) (ok bool) { ok = EpProtocol(addr.Network()) == EpProtocol(ep.addr.Network()) + if ok { + // try to convert addr to compatible type + switch ep.netw { + case "udp", "udp4", "udp6": + var ua *net.UDPAddr + var err error + if ua, err = net.ResolveUDPAddr(ep.netw, addr.String()); err != nil { + ok = false + } + logger.Printf(logger.DBG, "[pkt_ep] %s + %v -> %v (%v)", ep.netw, addr, ua, ok) + default: + logger.Printf(logger.DBG, "[pkt_ep] unknown network %s", ep.netw) + ok = false + } + } else { + logger.Printf(logger.DBG, "[pkt_ep] protocol mismatch %s -- %s", EpProtocol(addr.Network()), EpProtocol(ep.addr.Network())) + } return } diff --git a/src/gnunet/transport/reader_writer.go b/src/gnunet/transport/reader_writer.go @@ -19,7 +19,10 @@ package transport import ( + "bytes" "context" + "encoding/hex" + "encoding/json" "errors" "fmt" "gnunet/message" @@ -126,6 +129,26 @@ func ReadMessage(ctx context.Context, rdr io.ReadCloser, buf []byte) (msg messag } //---------------------------------------------------------------------- +// Dump message +func Dump(msg message.Message, format string) string { + switch format { + case "json": + buf, err := json.Marshal(msg) + if err != nil { + return err.Error() + } + return string(buf) + case "hex": + buf := new(bytes.Buffer) + if err := WriteMessageDirect(buf, msg); err != nil { + return err.Error() + } + return hex.EncodeToString(buf.Bytes()) + } + return "unknown message dump format" +} + +//---------------------------------------------------------------------- // helper for wrapped ReadCloser/WriteCloser (close is nop) //---------------------------------------------------------------------- diff --git a/src/gnunet/util/address_test.go b/src/gnunet/util/address_test.go @@ -0,0 +1,54 @@ +// This file is part of gnunet-go, a GNUnet-implementation in Golang. +// Copyright (C) 2019-2022 Bernd Fix >Y< +// +// gnunet-go is free software: you can redistribute it and/or modify it +// under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// gnunet-go is distributed in the hope that it will be useful, but +// WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// +// SPDX-License-Identifier: AGPL3.0-or-later + +package util + +import ( + "testing" +) + +func TestAddrList(t *testing.T) { + // list of addresses to check + addrS := []string{ + "ip+udp://127.0.0.1:10000", + "ip+udp://172.17.0.4:10000", + "ip+udp://[::ffff:172.17.0.4]:10000", + } + // convert to util.Address + addrA := make([]*Address, len(addrS)) + var err error + for i, as := range addrS { + if addrA[i], err = ParseAddress(as); err != nil { + t.Fatal(err) + } + } + // allocate AddrList + addrL := NewPeerAddrList() + for _, addr := range addrA { + rc := addrL.Add("2BHV4BN8736W5W3CJNXY2S9WABWTGH35QMFG4BPCWBH7DNBCFC60", addr) + t.Logf("added %s (%d)", addr.URI(), rc) + } + + // check list + t.Log("checking list...") + list := addrL.Get("2BHV4BN8736W5W3CJNXY2S9WABWTGH35QMFG4BPCWBH7DNBCFC60", "ip+udp") + t.Logf("got: %v", list) + if len(list) != len(addrS) { + t.Fatal("list size not matching") + } +}