service.go (3079B)
1 // This file is part of gnunet-go, a GNUnet-implementation in Golang. 2 // Copyright (C) 2019-2022 Bernd Fix >Y< 3 // 4 // gnunet-go is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Affero General Public License as published 6 // by the Free Software Foundation, either version 3 of the License, 7 // or (at your option) any later version. 8 // 9 // gnunet-go is distributed in the hope that it will be useful, but 10 // WITHOUT ANY WARRANTY; without even the implied warranty of 11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 // Affero General Public License for more details. 13 // 14 // You should have received a copy of the GNU Affero General Public License 15 // along with this program. If not, see <http://www.gnu.org/licenses/>. 16 // 17 // SPDX-License-Identifier: AGPL3.0-or-later 18 19 package dht 20 21 import ( 22 "context" 23 "fmt" 24 "io" 25 "time" 26 27 "gnunet/config" 28 "gnunet/core" 29 "gnunet/service" 30 31 "github.com/bfix/gospel/logger" 32 ) 33 34 // Error codes 35 var ( 36 ErrInvalidID = fmt.Errorf("invalid/unassociated ID") 37 ErrBlockExpired = fmt.Errorf("block expired") 38 ErrInvalidResponseType = fmt.Errorf("invald response type") 39 ) 40 41 // Time constants 42 var ( 43 DefaultGetTTL = 10 * time.Minute // timeout for GET requests 44 DiscoveryPeriod = 5 * time.Minute // time between peer discovery runs 45 ) 46 47 //---------------------------------------------------------------------- 48 // "GNUnet R5N DHT" service implementation 49 //---------------------------------------------------------------------- 50 51 // Service implements a DHT service 52 type Service struct { 53 Module 54 } 55 56 // NewService creates a new DHT service instance 57 func NewService(ctx context.Context, c *core.Core, cfg *config.DHTConfig) (*Service, error) { 58 mod, err := NewModule(ctx, c, cfg) 59 if err != nil { 60 return nil, err 61 } 62 srv := &Service{ 63 Module: *mod, 64 } 65 return srv, nil 66 } 67 68 // ServeClient processes a client channel. 69 func (s *Service) ServeClient(ctx context.Context, id int, mc *service.Connection) { 70 reqID := 0 71 var cancel context.CancelFunc 72 ctx, cancel = context.WithCancel(ctx) 73 74 loop: 75 for { 76 // receive next message from client 77 reqID++ 78 logger.Printf(logger.DBG, "[dht:%d:%d] Waiting for client request...\n", id, reqID) 79 msg, err := mc.Receive(ctx) 80 if err != nil { 81 if err == io.EOF { 82 logger.Printf(logger.INFO, "[dht:%d:%d] Client channel closed.\n", id, reqID) 83 } else if err == service.ErrConnectionInterrupted { 84 logger.Printf(logger.INFO, "[dht:%d:%d] Service operation interrupted.\n", id, reqID) 85 } else { 86 logger.Printf(logger.ERROR, "[dht:%d:%d] Message-receive failed: %s\n", id, reqID, err.Error()) 87 } 88 break loop 89 } 90 logger.Printf(logger.INFO, "[dht:%d:%d] Received request: %v\n", id, reqID, msg) 91 92 // handle message 93 valueCtx := context.WithValue(ctx, core.CtxKey("label"), fmt.Sprintf(":%d:%d", id, reqID)) 94 s.HandleMessage(valueCtx, nil, msg, mc) 95 } 96 // close client connection 97 mc.Close() 98 99 // cancel all tasks running for this session/connection 100 logger.Printf(logger.INFO, "[dht:%d] Start closing session...\n", id) 101 cancel() 102 }