gnunet-go

GNUnet Bindings for Go
Log | Files | Refs | README | LICENSE

module.go (13158B)


      1 // This file is part of gnunet-go, a GNUnet-implementation in Golang.
      2 // Copyright (C) 2019-2022 Bernd Fix  >Y<
      3 //
      4 // gnunet-go is free software: you can redistribute it and/or modify it
      5 // under the terms of the GNU Affero General Public License as published
      6 // by the Free Software Foundation, either version 3 of the License,
      7 // or (at your option) any later version.
      8 //
      9 // gnunet-go is distributed in the hope that it will be useful, but
     10 // WITHOUT ANY WARRANTY; without even the implied warranty of
     11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     12 // Affero General Public License for more details.
     13 //
     14 // You should have received a copy of the GNU Affero General Public License
     15 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
     16 //
     17 // SPDX-License-Identifier: AGPL3.0-or-later
     18 
     19 package dht
     20 
     21 import (
     22 	"context"
     23 	"encoding/hex"
     24 	"fmt"
     25 	"gnunet/config"
     26 	"gnunet/core"
     27 	"gnunet/crypto"
     28 	"gnunet/enums"
     29 	"gnunet/message"
     30 	"gnunet/service"
     31 	"gnunet/service/dht/blocks"
     32 	"gnunet/service/store"
     33 	"gnunet/util"
     34 	gmath "math"
     35 	"time"
     36 
     37 	"github.com/bfix/gospel/logger"
     38 )
     39 
     40 //======================================================================
     41 // "DHT" implementation
     42 //======================================================================
     43 
     44 //----------------------------------------------------------------------
     45 // Responder for local message handling (API, not message-based)
     46 //----------------------------------------------------------------------
     47 
     48 // LocalBlockResponder is a message handler used to handle results for
     49 // locally initiated GET calls
     50 type LocalBlockResponder struct {
     51 	ch chan blocks.Block   // out-going channel for incoming block results
     52 	rf blocks.ResultFilter // filter out duplicates
     53 }
     54 
     55 // NewLocalBlockResponder returns a new instance
     56 func NewLocalBlockResponder() *LocalBlockResponder {
     57 	return &LocalBlockResponder{
     58 		ch: make(chan blocks.Block),
     59 		rf: blocks.NewGenericResultFilter(128, util.RndUInt32()),
     60 	}
     61 }
     62 
     63 // C returns the back-channel
     64 func (lr *LocalBlockResponder) C() <-chan blocks.Block {
     65 	return lr.ch
     66 }
     67 
     68 // Send interface method: dissect message and relay block if appropriate
     69 func (lr *LocalBlockResponder) Send(ctx context.Context, msg message.Message) error {
     70 	// check if incoming message is a DHT-RESULT
     71 	switch res := msg.(type) {
     72 	case *message.DHTP2PResultMsg:
     73 		// deliver incoming blocks
     74 		go func() {
     75 			blk, err := blocks.NewBlock(res.BType, res.Expire, res.Block)
     76 			if err == nil {
     77 				lr.ch <- blk
     78 			} else {
     79 				logger.Println(logger.WARN, "[local] DHT-RESULT block problem: "+err.Error())
     80 				// DEBUG:
     81 				logger.Printf(logger.DBG, "[local] btype=%s, expire=%s", res.BType, res.Expire)
     82 				logger.Printf(logger.DBG, "[local] block=%s", hex.EncodeToString(res.Block))
     83 				panic("@@@")
     84 			}
     85 		}()
     86 	default:
     87 		logger.Printf(logger.WARN, "[local] %d not a DHT-RESULT -- skipped", msg.Type())
     88 	}
     89 	return nil
     90 }
     91 
     92 // Receiver is nil for local responders.
     93 func (lr *LocalBlockResponder) Receiver() *util.PeerID {
     94 	return nil
     95 }
     96 
     97 // Close back-channel
     98 func (lr *LocalBlockResponder) Close() {
     99 	close(lr.ch)
    100 }
    101 
    102 //----------------------------------------------------------------------
    103 // Put and get blocks into/from a DHT.
    104 //----------------------------------------------------------------------
    105 
    106 // Module handles the permanent storage of blocks under a query key.
    107 type Module struct {
    108 	service.ModuleImpl
    109 
    110 	cfg   *config.DHTConfig // configuraion parameters
    111 	store *store.DHTStore   // reference to the block storage mechanism
    112 	core  *core.Core        // reference to core services
    113 
    114 	rtable    *RoutingTable           // routing table
    115 	lastHello *message.DHTP2PHelloMsg // last own HELLO message used; re-create if expired
    116 	reshdlrs  *ResultHandlerList      // list of open tasks
    117 }
    118 
    119 // NewModule returns a new module instance. It initializes the storage
    120 // mechanism for persistence.
    121 func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Module, err error) {
    122 	// create permanent storage handler
    123 	var storage *store.DHTStore
    124 	if storage, err = store.NewDHTStore(cfg.Storage); err != nil {
    125 		return
    126 	}
    127 	// create routing table
    128 	rt := NewRoutingTable(NewPeerAddress(c.PeerID()), cfg.Routing)
    129 
    130 	// return module instance
    131 	m = &Module{
    132 		ModuleImpl: *service.NewModuleImpl(),
    133 		cfg:        cfg,
    134 		store:      storage,
    135 		core:       c,
    136 		rtable:     rt,
    137 		reshdlrs:   NewResultHandlerList(),
    138 	}
    139 	// register as listener for core events
    140 	pulse := time.Duration(cfg.Heartbeat) * time.Second
    141 	listener := m.Run(ctx, m.event, m.Filter(), pulse, m.heartbeat)
    142 	c.Register("dht", listener)
    143 
    144 	// run periodic tasks (8.2. peer discovery)
    145 	ticker := time.NewTicker(DiscoveryPeriod)
    146 	key := crypto.Hash(m.core.PeerID().Bytes())
    147 	flags := uint16(enums.DHT_RO_FIND_APPROXIMATE | enums.DHT_RO_DEMULTIPLEX_EVERYWHERE | enums.DHT_RO_DISCOVERY)
    148 	var resCh <-chan blocks.Block
    149 	go func() {
    150 		for {
    151 			select {
    152 			// initiate peer discovery
    153 			case <-ticker.C:
    154 				// query DHT for our own HELLO block
    155 				query := blocks.NewGenericQuery(key, enums.BLOCK_TYPE_DHT_HELLO, flags)
    156 				logger.Printf(logger.DBG, "[dht-discovery] own HELLO key %s", query.Key().Short())
    157 				resCh = m.Get(ctx, query)
    158 
    159 			// handle peer discover results
    160 			case res := <-resCh:
    161 				// check for correct type
    162 				btype := res.Type()
    163 				if btype == enums.BLOCK_TYPE_DHT_HELLO {
    164 					hb, ok := res.(*blocks.HelloBlock)
    165 					if !ok {
    166 						logger.Println(logger.WARN, "[dht-discovery] received invalid block data")
    167 						logger.Printf(logger.DBG, "[dht-discovery] -> %s", hex.EncodeToString(res.Bytes()))
    168 					} else if !hb.PeerID.Equal(m.core.PeerID()) {
    169 						// cache HELLO block
    170 						m.rtable.CacheHello(hb)
    171 						// add sender to routing table
    172 						m.rtable.Add(NewPeerAddress(hb.PeerID), "dht-discovery")
    173 						// learn addresses
    174 						m.core.Learn(ctx, hb.PeerID, hb.Addresses(), "dht-discovery")
    175 					}
    176 				} else {
    177 					logger.Printf(logger.WARN, "[dht-discovery] received invalid block type %s", btype)
    178 				}
    179 
    180 			// termination
    181 			case <-ctx.Done():
    182 				ticker.Stop()
    183 				return
    184 			}
    185 		}
    186 	}()
    187 	return
    188 }
    189 
    190 //----------------------------------------------------------------------
    191 // DHT methods for local use (API)
    192 //----------------------------------------------------------------------
    193 
    194 // Get blocks from the DHT ["dht:get"]
    195 // Locally request blocks for a given query. The res channel will deliver the
    196 // returned results to the caller; the channel is closed if no further blocks
    197 // are expected or the query times out.
    198 func (m *Module) Get(ctx context.Context, query blocks.Query) <-chan blocks.Block {
    199 	// get the block handler for given block type to construct an empty
    200 	// result filter. If no handler is defined, a default PassResultFilter
    201 	// is created.
    202 	var rf blocks.ResultFilter = new(blocks.GenericResultFilter)
    203 	blockHdlr, ok := blocks.BlockHandlers[query.Type()]
    204 	if ok {
    205 		// create result filter
    206 		rf = blockHdlr.SetupResultFilter(128, util.RndUInt32())
    207 	} else {
    208 		logger.Println(logger.WARN, "[dht] unknown result filter implementation -- skipped")
    209 	}
    210 	// get additional query parameters
    211 	xquery, _ := util.GetParam[[]byte](query.Params(), "xquery")
    212 
    213 	// assemble a new GET message
    214 	msg := message.NewDHTP2PGetMsg()
    215 	msg.BType = query.Type()
    216 	msg.Flags = query.Flags()
    217 	msg.HopCount = 0
    218 	msg.Query = query.Key()
    219 	msg.ReplLevel = uint16(m.cfg.Routing.ReplLevel)
    220 	msg.PeerFilter = blocks.NewPeerFilter()
    221 	msg.ResFilter = rf.Bytes()
    222 	msg.RfSize = uint16(len(msg.ResFilter))
    223 	msg.XQuery = xquery
    224 	msg.MsgSize += msg.RfSize + uint16(len(xquery))
    225 
    226 	// compose a response channel and handler
    227 	hdlr := NewLocalBlockResponder()
    228 
    229 	// time-out handling
    230 	ttl, ok := util.GetParam[time.Duration](query.Params(), "timeout")
    231 	if !ok {
    232 		// defaults to 10 minutes
    233 		ttl = DefaultGetTTL
    234 	}
    235 	lctx, cancel := context.WithTimeout(ctx, ttl)
    236 
    237 	// send message
    238 	self := m.core.PeerID()
    239 	msg.PeerFilter.Add(self)
    240 	go m.HandleMessage(lctx, self, msg, hdlr)
    241 	go func() {
    242 		<-lctx.Done()
    243 		hdlr.Close()
    244 		cancel()
    245 	}()
    246 	return hdlr.C()
    247 }
    248 
    249 // Put a block into the DHT ["dht:put"]
    250 func (m *Module) Put(ctx context.Context, query blocks.Query, block blocks.Block) error {
    251 	// assemble a new PUT message
    252 	msg := message.NewDHTP2PPutMsg(block)
    253 	msg.Flags = query.Flags()
    254 	msg.Key = query.Key().Clone()
    255 
    256 	// send message
    257 	self := m.core.PeerID()
    258 	msg.PeerFilter.Add(self)
    259 	go m.HandleMessage(ctx, self, msg, nil)
    260 
    261 	return nil
    262 }
    263 
    264 //----------------------------------------------------------------------
    265 // Event handling
    266 //----------------------------------------------------------------------
    267 
    268 // Filter returns the event filter for the module
    269 func (m *Module) Filter() *core.EventFilter {
    270 	f := core.NewEventFilter()
    271 	// events we are interested in
    272 	f.AddEvent(core.EV_CONNECT)
    273 	f.AddEvent(core.EV_DISCONNECT)
    274 
    275 	// messages we are interested in:
    276 	// (1) DHT_P2P messages
    277 	f.AddMsgType(enums.MSG_DHT_P2P_PUT)
    278 	f.AddMsgType(enums.MSG_DHT_P2P_GET)
    279 	f.AddMsgType(enums.MSG_DHT_P2P_RESULT)
    280 	f.AddMsgType(enums.MSG_DHT_P2P_HELLO)
    281 	// (2) DHT messages (legacy, not implemented)
    282 	f.AddMsgType(enums.MSG_DHT_CLIENT_GET)
    283 	f.AddMsgType(enums.MSG_DHT_CLIENT_GET_RESULTS_KNOWN)
    284 	f.AddMsgType(enums.MSG_DHT_CLIENT_GET_STOP)
    285 	f.AddMsgType(enums.MSG_DHT_CLIENT_PUT)
    286 	f.AddMsgType(enums.MSG_DHT_CLIENT_RESULT)
    287 
    288 	return f
    289 }
    290 
    291 // Event handler for infrastructure signals
    292 func (m *Module) event(ctx context.Context, ev *core.Event) {
    293 	switch ev.ID {
    294 	// New peer connected:
    295 	case core.EV_CONNECT:
    296 		// Add peer to routing table
    297 		logger.Printf(logger.INFO, "[dht-event] Peer %s connected", ev.Peer.Short())
    298 		m.rtable.Add(NewPeerAddress(ev.Peer), "dht-event")
    299 
    300 	// Peer disconnected:
    301 	case core.EV_DISCONNECT:
    302 		// Remove peer from routing table
    303 		logger.Printf(logger.INFO, "[dht-event] Peer %s disconnected", ev.Peer.Short())
    304 		m.rtable.Remove(NewPeerAddress(ev.Peer), "dht-event", 0)
    305 
    306 	// Message received.
    307 	case core.EV_MESSAGE:
    308 		// generate tracking label
    309 		label := fmt.Sprintf("dht-msg-%d", util.NextID())
    310 		tctx := context.WithValue(ctx, core.CtxKey("label"), label)
    311 		// check if peer is in routing table (connected peer)
    312 		if !m.rtable.Contains(NewPeerAddress(ev.Peer), label) {
    313 			logger.Printf(logger.WARN, "[%s] message %d from unregistered peer -- discarded", label, ev.Msg.Type())
    314 			return
    315 		}
    316 		// process message
    317 		if !m.HandleMessage(tctx, ev.Peer, ev.Msg, ev.Resp) {
    318 			logger.Printf(logger.WARN, "[%s] %s message NOT handled", label, ev.Msg.Type())
    319 		}
    320 	}
    321 }
    322 
    323 // ----------------------------------------------------------------------
    324 // Heartbeat handler for periodic tasks
    325 func (m *Module) heartbeat(ctx context.Context) {
    326 	// run heartbeat for routing table
    327 	m.rtable.heartbeat(ctx)
    328 
    329 	// clean-up task list
    330 	m.reshdlrs.Cleanup()
    331 }
    332 
    333 //----------------------------------------------------------------------
    334 // HELLO handling
    335 //----------------------------------------------------------------------
    336 
    337 // Send the currently active HELLO to given network address
    338 func (m *Module) SendHello(ctx context.Context, addr *util.Address, label string) (err error) {
    339 	// get (buffered) HELLO
    340 	var msg *message.DHTP2PHelloMsg
    341 	if msg, err = m.getHello(label); err != nil {
    342 		return
    343 	}
    344 	logger.Printf(logger.INFO, "[%s] Sending own HELLO to %s", label, addr.URI())
    345 	return m.core.SendToAddr(ctx, addr, msg)
    346 }
    347 
    348 // get the recent HELLO if it is defined and not expired;
    349 // create a new HELLO otherwise.
    350 func (m *Module) getHello(label string) (msg *message.DHTP2PHelloMsg, err error) {
    351 	if m.lastHello == nil || m.lastHello.Expire.Expired() {
    352 		// assemble new (signed) HELLO block
    353 		var addrList []*util.Address
    354 		if addrList, err = m.core.Addresses(); err != nil {
    355 			return
    356 		}
    357 		// assemble HELLO data
    358 		hb := new(blocks.HelloBlock)
    359 		hb.PeerID = m.core.PeerID()
    360 		hb.SetExpire(message.HelloAddressExpiration)
    361 		hb.SetAddresses(addrList)
    362 
    363 		// sign HELLO block
    364 		if err = m.core.Sign(hb); err != nil {
    365 			return
    366 		}
    367 		// assemble HELLO message
    368 		msg = message.NewDHTP2PHelloMsg()
    369 		msg.Expire = hb.Expire_
    370 		msg.SetAddresses(hb.Addresses())
    371 		if err = m.core.Sign(msg); err != nil {
    372 			return
    373 		}
    374 		// save for later use
    375 		m.lastHello = msg
    376 
    377 		// DEBUG:
    378 		var ok bool
    379 		if ok, err = msg.Verify(m.core.PeerID()); !ok || err != nil {
    380 			if !ok {
    381 				err = fmt.Errorf("[%s] failed to verify own HELLO", label)
    382 			}
    383 			logger.Println(logger.ERROR, err.Error())
    384 			return
    385 		}
    386 		logger.Printf(logger.INFO, "[%s] new own HELLO created (expires %s)", label, msg.Expire)
    387 		return
    388 	}
    389 	// we have a valid HELLO for re-use.
    390 	return m.lastHello, nil
    391 }
    392 
    393 //----------------------------------------------------------------------
    394 // Inter-module linkage helpers
    395 //----------------------------------------------------------------------
    396 
    397 // Export functions
    398 func (m *Module) Export(fcn map[string]any) {
    399 	// add exported functions from module
    400 	fcn["dht:get"] = m.Get
    401 	fcn["dht:put"] = m.Put
    402 }
    403 
    404 // Import functions
    405 func (m *Module) Import(fcn map[string]any) {
    406 	// nothing to import for now.
    407 }
    408 
    409 //----------------------------------------------------------------------
    410 
    411 // SetNetworkSize sets a fixed number of peers in the network
    412 func (m *Module) SetNetworkSize(numPeers int) {
    413 	m.rtable.l2nse = gmath.Log2(float64(numPeers))
    414 }