aboutsummaryrefslogtreecommitdiff
path: root/src/gnunet/service/service.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/gnunet/service/service.go')
-rw-r--r--src/gnunet/service/service.go200
1 files changed, 94 insertions, 106 deletions
diff --git a/src/gnunet/service/service.go b/src/gnunet/service/service.go
index c996e0c..32ccf67 100644
--- a/src/gnunet/service/service.go
+++ b/src/gnunet/service/service.go
@@ -1,5 +1,5 @@
1// This file is part of gnunet-go, a GNUnet-implementation in Golang. 1// This file is part of gnunet-go, a GNUnet-implementation in Golang.
2// Copyright (C) 2019, 2020 Bernd Fix >Y< 2// Copyright (C) 2019-2022 Bernd Fix >Y<
3// 3//
4// gnunet-go is free software: you can redistribute it and/or modify it 4// gnunet-go is free software: you can redistribute it and/or modify it
5// under the terms of the GNU Affero General Public License as published 5// under the terms of the GNU Affero General Public License as published
@@ -19,145 +19,133 @@
19package service 19package service
20 20
21import ( 21import (
22 "context"
23 "errors"
22 "fmt" 24 "fmt"
23 "net/http" 25 "gnunet/message"
24 "sync" 26 "gnunet/util"
25
26 "gnunet/transport"
27 27
28 "github.com/bfix/gospel/logger" 28 "github.com/bfix/gospel/logger"
29) 29)
30 30
31// Module is an interface for GNUnet service modules (workers). 31//----------------------------------------------------------------------
32type Module interface { 32
33 // RPC returns the route and handler for JSON-RPC requests 33// Responder is a back-channel for messages generated during
34 RPC() (string, func(http.ResponseWriter, *http.Request)) 34// message processing. The Connection type is a responder
35// and used as such in ServeClient().
36type Responder interface {
37 // Handle outgoing message
38 Send(ctx context.Context, msg message.Message) error
39}
40
41// TransportResponder is used as a responder in message handling for
42// messages received from Transport.
43type TransportResponder struct {
44 Peer *util.PeerID
45 SendFcn func(context.Context, *util.PeerID, message.Message) error
35} 46}
36 47
37// Service is an interface for GNUnet services. Every service has one channel 48// Send a message back to caller.
38// end-point it listens to for incoming channel requests (network-based 49func (r *TransportResponder) Send(ctx context.Context, msg message.Message) error {
39// channels established by service clients). The end-point is specified in 50 if r.SendFcn == nil {
40// Channel semantics in the specification string. 51 return errors.New("no send function defined")
52 }
53 return r.SendFcn(ctx, r.Peer, msg)
54}
55
56//----------------------------------------------------------------------
57
58// Service is an interface for GNUnet services
41type Service interface { 59type Service interface {
42 Module 60 Module
43 // Start a service on the given endpoint 61
44 Start(spec string) error 62 // Serve a client session: A service has a socket it listens to for
45 // Serve a client session 63 // incoming connections (sessions) which are used for message exchange
46 ServeClient(ctx *SessionContext, ch *transport.MsgChannel) 64 // with local GNUnet services or clients.
47 // Stop the service 65 ServeClient(ctx context.Context, id int, mc *Connection)
48 Stop() error 66
67 // Handle a single incoming message (either locally from a socket
68 // connection or from Transport). Response messages can be send
69 // via a Responder. Returns true if message was processed.
70 HandleMessage(ctx context.Context, msg message.Message, resp Responder) bool
49} 71}
50 72
51// Impl is an implementation of generic service functionality. 73// SocketHandler handles incoming connections on the local service socket.
52type Impl struct { 74// It delegates calls to ServeClient() and HandleMessage() methods
53 impl Service // Specific service implementation 75// to a custom service 'srv'.
54 hdlr chan transport.Channel // Channel from listener 76type SocketHandler struct {
55 ctrl chan bool // Control channel 77 srv Service // Specific service implementation
56 drop chan int // Channel to drop a session from pending list 78 hdlr chan *Connection // handler for incoming connections
57 srvc transport.ChannelServer // multi-user service 79 cmgr *ConnectionManager // manager for client connections
58 wg *sync.WaitGroup // wait group for go routine synchronization 80 name string // service name
59 name string // service name
60 running bool // service currently running?
61 pending map[int]*SessionContext // list of pending sessions
62} 81}
63 82
64// NewServiceImpl instantiates a new ServiceImpl object. 83// NewSocketHandler instantiates a new socket handler.
65func NewServiceImpl(name string, srv Service) *Impl { 84func NewSocketHandler(name string, srv Service) *SocketHandler {
66 return &Impl{ 85 return &SocketHandler{
67 impl: srv, 86 srv: srv,
68 hdlr: make(chan transport.Channel), 87 hdlr: make(chan *Connection),
69 ctrl: make(chan bool), 88 cmgr: nil,
70 drop: make(chan int), 89 name: name,
71 srvc: nil,
72 wg: new(sync.WaitGroup),
73 name: name,
74 running: false,
75 pending: make(map[int]*SessionContext),
76 } 90 }
77} 91}
78 92
79// Start a service 93// Start the socket handler by listening on a Unix domain socket specified
80func (si *Impl) Start(spec string) (err error) { 94// by its path and additional parameters. Incoming connections from clients
95// are dispatched to 'hdlr'. Stopped socket handlers can be re-started.
96func (h *SocketHandler) Start(ctx context.Context, path string, params map[string]string) (err error) {
81 // check if we are already running 97 // check if we are already running
82 if si.running { 98 if h.cmgr != nil {
83 logger.Printf(logger.ERROR, "Service '%s' already running.\n", si.name) 99 logger.Printf(logger.ERROR, "Service '%s' already running.\n", h.name)
84 return fmt.Errorf("service already running") 100 return fmt.Errorf("service already running")
85 } 101 }
86 102 // start connection manager
87 // start channel server 103 logger.Printf(logger.INFO, "[%s] Service starting.\n", h.name)
88 logger.Printf(logger.INFO, "[%s] Service starting.\n", si.name) 104 if h.cmgr, err = NewConnectionManager(ctx, path, params, h.hdlr); err != nil {
89 if si.srvc, err = transport.NewChannelServer(spec, si.hdlr); err != nil {
90 return 105 return
91 } 106 }
92 si.running = true
93 107
94 // handle clients 108 // handle client connections
95 si.wg.Add(1)
96 go func() { 109 go func() {
97 defer si.wg.Done()
98 loop: 110 loop:
99 for si.running { 111 for {
100 select { 112 select {
101 113
102 // handle incoming connections 114 // handle incoming connection
103 case in := <-si.hdlr: 115 case conn := <-h.hdlr:
104 if in == nil { 116 // run a new session with context
105 logger.Printf(logger.INFO, "[%s] Listener terminated.\n", si.name) 117 id := util.NextID()
106 break loop 118 logger.Printf(logger.INFO, "[%s] Session '%d' started.\n", h.name, id)
107 } 119
108 switch ch := in.(type) { 120 go func() {
109 case transport.Channel: 121 // serve client on the message channel
110 // run a new session with context 122 h.srv.ServeClient(ctx, id, conn)
111 ctx := NewSessionContext() 123 // session is done now.
112 sessID := ctx.ID 124 logger.Printf(logger.INFO, "[%s] Session with client '%d' ended.\n", h.name, id)
113 si.pending[sessID] = ctx 125 }()
114 logger.Printf(logger.INFO, "[%s] Session '%d' started.\n", si.name, sessID) 126
115 127 // handle termination
116 go func() { 128 case <-ctx.Done():
117 // serve client on the message channel 129 logger.Printf(logger.INFO, "[%s] Listener terminated.\n", h.name)
118 si.impl.ServeClient(ctx, transport.NewMsgChannel(ch))
119 // session is done now.
120 logger.Printf(logger.INFO, "[%s] Session with client '%d' ended.\n", si.name, sessID)
121 si.drop <- sessID
122 }()
123 }
124
125 // handle session removal
126 case sessID := <-si.drop:
127 delete(si.pending, sessID)
128
129 // handle cancelation signal on listener.
130 case <-si.ctrl:
131 break loop 130 break loop
132 } 131 }
133 } 132 }
134 133
135 // terminate pending sessions
136 for _, ctx := range si.pending {
137 logger.Printf(logger.DBG, "[%s] Session '%d' closing...\n", si.name, ctx.ID)
138 ctx.Cancel()
139 }
140
141 // close-down service 134 // close-down service
142 logger.Printf(logger.INFO, "[%s] Service closing.\n", si.name) 135 logger.Printf(logger.INFO, "[%s] Service closing.\n", h.name)
143 si.srvc.Close() 136 h.cmgr.Close()
144 si.running = false
145 }() 137 }()
146 138 return nil
147 return si.impl.Start(spec)
148} 139}
149 140
150// Stop a service 141// Stop socket handler.
151func (si *Impl) Stop() error { 142func (h *SocketHandler) Stop() error {
152 if !si.running { 143 if h.cmgr == nil {
153 logger.Printf(logger.WARN, "Service '%s' not running.\n", si.name) 144 logger.Printf(logger.WARN, "Service '%s' not running.\n", h.name)
154 return fmt.Errorf("service not running") 145 return fmt.Errorf("service not running")
155 } 146 }
156 si.running = false 147 logger.Printf(logger.INFO, "[%s] Service terminating.\n", h.name)
157 si.ctrl <- true 148 h.cmgr.Close()
158 logger.Printf(logger.INFO, "[%s] Service terminating.\n", si.name) 149 h.cmgr = nil
159 150 return nil
160 err := si.impl.Stop()
161 si.wg.Wait()
162 return err
163} 151}