module.go (13158B)
1 // This file is part of gnunet-go, a GNUnet-implementation in Golang. 2 // Copyright (C) 2019-2022 Bernd Fix >Y< 3 // 4 // gnunet-go is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Affero General Public License as published 6 // by the Free Software Foundation, either version 3 of the License, 7 // or (at your option) any later version. 8 // 9 // gnunet-go is distributed in the hope that it will be useful, but 10 // WITHOUT ANY WARRANTY; without even the implied warranty of 11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 // Affero General Public License for more details. 13 // 14 // You should have received a copy of the GNU Affero General Public License 15 // along with this program. If not, see <http://www.gnu.org/licenses/>. 16 // 17 // SPDX-License-Identifier: AGPL3.0-or-later 18 19 package dht 20 21 import ( 22 "context" 23 "encoding/hex" 24 "fmt" 25 "gnunet/config" 26 "gnunet/core" 27 "gnunet/crypto" 28 "gnunet/enums" 29 "gnunet/message" 30 "gnunet/service" 31 "gnunet/service/dht/blocks" 32 "gnunet/service/store" 33 "gnunet/util" 34 gmath "math" 35 "time" 36 37 "github.com/bfix/gospel/logger" 38 ) 39 40 //====================================================================== 41 // "DHT" implementation 42 //====================================================================== 43 44 //---------------------------------------------------------------------- 45 // Responder for local message handling (API, not message-based) 46 //---------------------------------------------------------------------- 47 48 // LocalBlockResponder is a message handler used to handle results for 49 // locally initiated GET calls 50 type LocalBlockResponder struct { 51 ch chan blocks.Block // out-going channel for incoming block results 52 rf blocks.ResultFilter // filter out duplicates 53 } 54 55 // NewLocalBlockResponder returns a new instance 56 func NewLocalBlockResponder() *LocalBlockResponder { 57 return &LocalBlockResponder{ 58 ch: make(chan blocks.Block), 59 rf: blocks.NewGenericResultFilter(128, util.RndUInt32()), 60 } 61 } 62 63 // C returns the back-channel 64 func (lr *LocalBlockResponder) C() <-chan blocks.Block { 65 return lr.ch 66 } 67 68 // Send interface method: dissect message and relay block if appropriate 69 func (lr *LocalBlockResponder) Send(ctx context.Context, msg message.Message) error { 70 // check if incoming message is a DHT-RESULT 71 switch res := msg.(type) { 72 case *message.DHTP2PResultMsg: 73 // deliver incoming blocks 74 go func() { 75 blk, err := blocks.NewBlock(res.BType, res.Expire, res.Block) 76 if err == nil { 77 lr.ch <- blk 78 } else { 79 logger.Println(logger.WARN, "[local] DHT-RESULT block problem: "+err.Error()) 80 // DEBUG: 81 logger.Printf(logger.DBG, "[local] btype=%s, expire=%s", res.BType, res.Expire) 82 logger.Printf(logger.DBG, "[local] block=%s", hex.EncodeToString(res.Block)) 83 panic("@@@") 84 } 85 }() 86 default: 87 logger.Printf(logger.WARN, "[local] %d not a DHT-RESULT -- skipped", msg.Type()) 88 } 89 return nil 90 } 91 92 // Receiver is nil for local responders. 93 func (lr *LocalBlockResponder) Receiver() *util.PeerID { 94 return nil 95 } 96 97 // Close back-channel 98 func (lr *LocalBlockResponder) Close() { 99 close(lr.ch) 100 } 101 102 //---------------------------------------------------------------------- 103 // Put and get blocks into/from a DHT. 104 //---------------------------------------------------------------------- 105 106 // Module handles the permanent storage of blocks under a query key. 107 type Module struct { 108 service.ModuleImpl 109 110 cfg *config.DHTConfig // configuraion parameters 111 store *store.DHTStore // reference to the block storage mechanism 112 core *core.Core // reference to core services 113 114 rtable *RoutingTable // routing table 115 lastHello *message.DHTP2PHelloMsg // last own HELLO message used; re-create if expired 116 reshdlrs *ResultHandlerList // list of open tasks 117 } 118 119 // NewModule returns a new module instance. It initializes the storage 120 // mechanism for persistence. 121 func NewModule(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (m *Module, err error) { 122 // create permanent storage handler 123 var storage *store.DHTStore 124 if storage, err = store.NewDHTStore(cfg.Storage); err != nil { 125 return 126 } 127 // create routing table 128 rt := NewRoutingTable(NewPeerAddress(c.PeerID()), cfg.Routing) 129 130 // return module instance 131 m = &Module{ 132 ModuleImpl: *service.NewModuleImpl(), 133 cfg: cfg, 134 store: storage, 135 core: c, 136 rtable: rt, 137 reshdlrs: NewResultHandlerList(), 138 } 139 // register as listener for core events 140 pulse := time.Duration(cfg.Heartbeat) * time.Second 141 listener := m.Run(ctx, m.event, m.Filter(), pulse, m.heartbeat) 142 c.Register("dht", listener) 143 144 // run periodic tasks (8.2. peer discovery) 145 ticker := time.NewTicker(DiscoveryPeriod) 146 key := crypto.Hash(m.core.PeerID().Bytes()) 147 flags := uint16(enums.DHT_RO_FIND_APPROXIMATE | enums.DHT_RO_DEMULTIPLEX_EVERYWHERE | enums.DHT_RO_DISCOVERY) 148 var resCh <-chan blocks.Block 149 go func() { 150 for { 151 select { 152 // initiate peer discovery 153 case <-ticker.C: 154 // query DHT for our own HELLO block 155 query := blocks.NewGenericQuery(key, enums.BLOCK_TYPE_DHT_HELLO, flags) 156 logger.Printf(logger.DBG, "[dht-discovery] own HELLO key %s", query.Key().Short()) 157 resCh = m.Get(ctx, query) 158 159 // handle peer discover results 160 case res := <-resCh: 161 // check for correct type 162 btype := res.Type() 163 if btype == enums.BLOCK_TYPE_DHT_HELLO { 164 hb, ok := res.(*blocks.HelloBlock) 165 if !ok { 166 logger.Println(logger.WARN, "[dht-discovery] received invalid block data") 167 logger.Printf(logger.DBG, "[dht-discovery] -> %s", hex.EncodeToString(res.Bytes())) 168 } else if !hb.PeerID.Equal(m.core.PeerID()) { 169 // cache HELLO block 170 m.rtable.CacheHello(hb) 171 // add sender to routing table 172 m.rtable.Add(NewPeerAddress(hb.PeerID), "dht-discovery") 173 // learn addresses 174 m.core.Learn(ctx, hb.PeerID, hb.Addresses(), "dht-discovery") 175 } 176 } else { 177 logger.Printf(logger.WARN, "[dht-discovery] received invalid block type %s", btype) 178 } 179 180 // termination 181 case <-ctx.Done(): 182 ticker.Stop() 183 return 184 } 185 } 186 }() 187 return 188 } 189 190 //---------------------------------------------------------------------- 191 // DHT methods for local use (API) 192 //---------------------------------------------------------------------- 193 194 // Get blocks from the DHT ["dht:get"] 195 // Locally request blocks for a given query. The res channel will deliver the 196 // returned results to the caller; the channel is closed if no further blocks 197 // are expected or the query times out. 198 func (m *Module) Get(ctx context.Context, query blocks.Query) <-chan blocks.Block { 199 // get the block handler for given block type to construct an empty 200 // result filter. If no handler is defined, a default PassResultFilter 201 // is created. 202 var rf blocks.ResultFilter = new(blocks.GenericResultFilter) 203 blockHdlr, ok := blocks.BlockHandlers[query.Type()] 204 if ok { 205 // create result filter 206 rf = blockHdlr.SetupResultFilter(128, util.RndUInt32()) 207 } else { 208 logger.Println(logger.WARN, "[dht] unknown result filter implementation -- skipped") 209 } 210 // get additional query parameters 211 xquery, _ := util.GetParam[[]byte](query.Params(), "xquery") 212 213 // assemble a new GET message 214 msg := message.NewDHTP2PGetMsg() 215 msg.BType = query.Type() 216 msg.Flags = query.Flags() 217 msg.HopCount = 0 218 msg.Query = query.Key() 219 msg.ReplLevel = uint16(m.cfg.Routing.ReplLevel) 220 msg.PeerFilter = blocks.NewPeerFilter() 221 msg.ResFilter = rf.Bytes() 222 msg.RfSize = uint16(len(msg.ResFilter)) 223 msg.XQuery = xquery 224 msg.MsgSize += msg.RfSize + uint16(len(xquery)) 225 226 // compose a response channel and handler 227 hdlr := NewLocalBlockResponder() 228 229 // time-out handling 230 ttl, ok := util.GetParam[time.Duration](query.Params(), "timeout") 231 if !ok { 232 // defaults to 10 minutes 233 ttl = DefaultGetTTL 234 } 235 lctx, cancel := context.WithTimeout(ctx, ttl) 236 237 // send message 238 self := m.core.PeerID() 239 msg.PeerFilter.Add(self) 240 go m.HandleMessage(lctx, self, msg, hdlr) 241 go func() { 242 <-lctx.Done() 243 hdlr.Close() 244 cancel() 245 }() 246 return hdlr.C() 247 } 248 249 // Put a block into the DHT ["dht:put"] 250 func (m *Module) Put(ctx context.Context, query blocks.Query, block blocks.Block) error { 251 // assemble a new PUT message 252 msg := message.NewDHTP2PPutMsg(block) 253 msg.Flags = query.Flags() 254 msg.Key = query.Key().Clone() 255 256 // send message 257 self := m.core.PeerID() 258 msg.PeerFilter.Add(self) 259 go m.HandleMessage(ctx, self, msg, nil) 260 261 return nil 262 } 263 264 //---------------------------------------------------------------------- 265 // Event handling 266 //---------------------------------------------------------------------- 267 268 // Filter returns the event filter for the module 269 func (m *Module) Filter() *core.EventFilter { 270 f := core.NewEventFilter() 271 // events we are interested in 272 f.AddEvent(core.EV_CONNECT) 273 f.AddEvent(core.EV_DISCONNECT) 274 275 // messages we are interested in: 276 // (1) DHT_P2P messages 277 f.AddMsgType(enums.MSG_DHT_P2P_PUT) 278 f.AddMsgType(enums.MSG_DHT_P2P_GET) 279 f.AddMsgType(enums.MSG_DHT_P2P_RESULT) 280 f.AddMsgType(enums.MSG_DHT_P2P_HELLO) 281 // (2) DHT messages (legacy, not implemented) 282 f.AddMsgType(enums.MSG_DHT_CLIENT_GET) 283 f.AddMsgType(enums.MSG_DHT_CLIENT_GET_RESULTS_KNOWN) 284 f.AddMsgType(enums.MSG_DHT_CLIENT_GET_STOP) 285 f.AddMsgType(enums.MSG_DHT_CLIENT_PUT) 286 f.AddMsgType(enums.MSG_DHT_CLIENT_RESULT) 287 288 return f 289 } 290 291 // Event handler for infrastructure signals 292 func (m *Module) event(ctx context.Context, ev *core.Event) { 293 switch ev.ID { 294 // New peer connected: 295 case core.EV_CONNECT: 296 // Add peer to routing table 297 logger.Printf(logger.INFO, "[dht-event] Peer %s connected", ev.Peer.Short()) 298 m.rtable.Add(NewPeerAddress(ev.Peer), "dht-event") 299 300 // Peer disconnected: 301 case core.EV_DISCONNECT: 302 // Remove peer from routing table 303 logger.Printf(logger.INFO, "[dht-event] Peer %s disconnected", ev.Peer.Short()) 304 m.rtable.Remove(NewPeerAddress(ev.Peer), "dht-event", 0) 305 306 // Message received. 307 case core.EV_MESSAGE: 308 // generate tracking label 309 label := fmt.Sprintf("dht-msg-%d", util.NextID()) 310 tctx := context.WithValue(ctx, core.CtxKey("label"), label) 311 // check if peer is in routing table (connected peer) 312 if !m.rtable.Contains(NewPeerAddress(ev.Peer), label) { 313 logger.Printf(logger.WARN, "[%s] message %d from unregistered peer -- discarded", label, ev.Msg.Type()) 314 return 315 } 316 // process message 317 if !m.HandleMessage(tctx, ev.Peer, ev.Msg, ev.Resp) { 318 logger.Printf(logger.WARN, "[%s] %s message NOT handled", label, ev.Msg.Type()) 319 } 320 } 321 } 322 323 // ---------------------------------------------------------------------- 324 // Heartbeat handler for periodic tasks 325 func (m *Module) heartbeat(ctx context.Context) { 326 // run heartbeat for routing table 327 m.rtable.heartbeat(ctx) 328 329 // clean-up task list 330 m.reshdlrs.Cleanup() 331 } 332 333 //---------------------------------------------------------------------- 334 // HELLO handling 335 //---------------------------------------------------------------------- 336 337 // Send the currently active HELLO to given network address 338 func (m *Module) SendHello(ctx context.Context, addr *util.Address, label string) (err error) { 339 // get (buffered) HELLO 340 var msg *message.DHTP2PHelloMsg 341 if msg, err = m.getHello(label); err != nil { 342 return 343 } 344 logger.Printf(logger.INFO, "[%s] Sending own HELLO to %s", label, addr.URI()) 345 return m.core.SendToAddr(ctx, addr, msg) 346 } 347 348 // get the recent HELLO if it is defined and not expired; 349 // create a new HELLO otherwise. 350 func (m *Module) getHello(label string) (msg *message.DHTP2PHelloMsg, err error) { 351 if m.lastHello == nil || m.lastHello.Expire.Expired() { 352 // assemble new (signed) HELLO block 353 var addrList []*util.Address 354 if addrList, err = m.core.Addresses(); err != nil { 355 return 356 } 357 // assemble HELLO data 358 hb := new(blocks.HelloBlock) 359 hb.PeerID = m.core.PeerID() 360 hb.SetExpire(message.HelloAddressExpiration) 361 hb.SetAddresses(addrList) 362 363 // sign HELLO block 364 if err = m.core.Sign(hb); err != nil { 365 return 366 } 367 // assemble HELLO message 368 msg = message.NewDHTP2PHelloMsg() 369 msg.Expire = hb.Expire_ 370 msg.SetAddresses(hb.Addresses()) 371 if err = m.core.Sign(msg); err != nil { 372 return 373 } 374 // save for later use 375 m.lastHello = msg 376 377 // DEBUG: 378 var ok bool 379 if ok, err = msg.Verify(m.core.PeerID()); !ok || err != nil { 380 if !ok { 381 err = fmt.Errorf("[%s] failed to verify own HELLO", label) 382 } 383 logger.Println(logger.ERROR, err.Error()) 384 return 385 } 386 logger.Printf(logger.INFO, "[%s] new own HELLO created (expires %s)", label, msg.Expire) 387 return 388 } 389 // we have a valid HELLO for re-use. 390 return m.lastHello, nil 391 } 392 393 //---------------------------------------------------------------------- 394 // Inter-module linkage helpers 395 //---------------------------------------------------------------------- 396 397 // Export functions 398 func (m *Module) Export(fcn map[string]any) { 399 // add exported functions from module 400 fcn["dht:get"] = m.Get 401 fcn["dht:put"] = m.Put 402 } 403 404 // Import functions 405 func (m *Module) Import(fcn map[string]any) { 406 // nothing to import for now. 407 } 408 409 //---------------------------------------------------------------------- 410 411 // SetNetworkSize sets a fixed number of peers in the network 412 func (m *Module) SetNetworkSize(numPeers int) { 413 m.rtable.l2nse = gmath.Log2(float64(numPeers)) 414 }