diff options
Diffstat (limited to 'src/gnunet/service/service.go')
-rw-r--r-- | src/gnunet/service/service.go | 200 |
1 files changed, 94 insertions, 106 deletions
diff --git a/src/gnunet/service/service.go b/src/gnunet/service/service.go index c996e0c..32ccf67 100644 --- a/src/gnunet/service/service.go +++ b/src/gnunet/service/service.go | |||
@@ -1,5 +1,5 @@ | |||
1 | // This file is part of gnunet-go, a GNUnet-implementation in Golang. | 1 | // This file is part of gnunet-go, a GNUnet-implementation in Golang. |
2 | // Copyright (C) 2019, 2020 Bernd Fix >Y< | 2 | // Copyright (C) 2019-2022 Bernd Fix >Y< |
3 | // | 3 | // |
4 | // gnunet-go is free software: you can redistribute it and/or modify it | 4 | // gnunet-go is free software: you can redistribute it and/or modify it |
5 | // under the terms of the GNU Affero General Public License as published | 5 | // under the terms of the GNU Affero General Public License as published |
@@ -19,145 +19,133 @@ | |||
19 | package service | 19 | package service |
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "context" | ||
23 | "errors" | ||
22 | "fmt" | 24 | "fmt" |
23 | "net/http" | 25 | "gnunet/message" |
24 | "sync" | 26 | "gnunet/util" |
25 | |||
26 | "gnunet/transport" | ||
27 | 27 | ||
28 | "github.com/bfix/gospel/logger" | 28 | "github.com/bfix/gospel/logger" |
29 | ) | 29 | ) |
30 | 30 | ||
31 | // Module is an interface for GNUnet service modules (workers). | 31 | //---------------------------------------------------------------------- |
32 | type Module interface { | 32 | |
33 | // RPC returns the route and handler for JSON-RPC requests | 33 | // Responder is a back-channel for messages generated during |
34 | RPC() (string, func(http.ResponseWriter, *http.Request)) | 34 | // message processing. The Connection type is a responder |
35 | // and used as such in ServeClient(). | ||
36 | type Responder interface { | ||
37 | // Handle outgoing message | ||
38 | Send(ctx context.Context, msg message.Message) error | ||
39 | } | ||
40 | |||
41 | // TransportResponder is used as a responder in message handling for | ||
42 | // messages received from Transport. | ||
43 | type TransportResponder struct { | ||
44 | Peer *util.PeerID | ||
45 | SendFcn func(context.Context, *util.PeerID, message.Message) error | ||
35 | } | 46 | } |
36 | 47 | ||
37 | // Service is an interface for GNUnet services. Every service has one channel | 48 | // Send a message back to caller. |
38 | // end-point it listens to for incoming channel requests (network-based | 49 | func (r *TransportResponder) Send(ctx context.Context, msg message.Message) error { |
39 | // channels established by service clients). The end-point is specified in | 50 | if r.SendFcn == nil { |
40 | // Channel semantics in the specification string. | 51 | return errors.New("no send function defined") |
52 | } | ||
53 | return r.SendFcn(ctx, r.Peer, msg) | ||
54 | } | ||
55 | |||
56 | //---------------------------------------------------------------------- | ||
57 | |||
58 | // Service is an interface for GNUnet services | ||
41 | type Service interface { | 59 | type Service interface { |
42 | Module | 60 | Module |
43 | // Start a service on the given endpoint | 61 | |
44 | Start(spec string) error | 62 | // Serve a client session: A service has a socket it listens to for |
45 | // Serve a client session | 63 | // incoming connections (sessions) which are used for message exchange |
46 | ServeClient(ctx *SessionContext, ch *transport.MsgChannel) | 64 | // with local GNUnet services or clients. |
47 | // Stop the service | 65 | ServeClient(ctx context.Context, id int, mc *Connection) |
48 | Stop() error | 66 | |
67 | // Handle a single incoming message (either locally from a socket | ||
68 | // connection or from Transport). Response messages can be send | ||
69 | // via a Responder. Returns true if message was processed. | ||
70 | HandleMessage(ctx context.Context, msg message.Message, resp Responder) bool | ||
49 | } | 71 | } |
50 | 72 | ||
51 | // Impl is an implementation of generic service functionality. | 73 | // SocketHandler handles incoming connections on the local service socket. |
52 | type Impl struct { | 74 | // It delegates calls to ServeClient() and HandleMessage() methods |
53 | impl Service // Specific service implementation | 75 | // to a custom service 'srv'. |
54 | hdlr chan transport.Channel // Channel from listener | 76 | type SocketHandler struct { |
55 | ctrl chan bool // Control channel | 77 | srv Service // Specific service implementation |
56 | drop chan int // Channel to drop a session from pending list | 78 | hdlr chan *Connection // handler for incoming connections |
57 | srvc transport.ChannelServer // multi-user service | 79 | cmgr *ConnectionManager // manager for client connections |
58 | wg *sync.WaitGroup // wait group for go routine synchronization | 80 | name string // service name |
59 | name string // service name | ||
60 | running bool // service currently running? | ||
61 | pending map[int]*SessionContext // list of pending sessions | ||
62 | } | 81 | } |
63 | 82 | ||
64 | // NewServiceImpl instantiates a new ServiceImpl object. | 83 | // NewSocketHandler instantiates a new socket handler. |
65 | func NewServiceImpl(name string, srv Service) *Impl { | 84 | func NewSocketHandler(name string, srv Service) *SocketHandler { |
66 | return &Impl{ | 85 | return &SocketHandler{ |
67 | impl: srv, | 86 | srv: srv, |
68 | hdlr: make(chan transport.Channel), | 87 | hdlr: make(chan *Connection), |
69 | ctrl: make(chan bool), | 88 | cmgr: nil, |
70 | drop: make(chan int), | 89 | name: name, |
71 | srvc: nil, | ||
72 | wg: new(sync.WaitGroup), | ||
73 | name: name, | ||
74 | running: false, | ||
75 | pending: make(map[int]*SessionContext), | ||
76 | } | 90 | } |
77 | } | 91 | } |
78 | 92 | ||
79 | // Start a service | 93 | // Start the socket handler by listening on a Unix domain socket specified |
80 | func (si *Impl) Start(spec string) (err error) { | 94 | // by its path and additional parameters. Incoming connections from clients |
95 | // are dispatched to 'hdlr'. Stopped socket handlers can be re-started. | ||
96 | func (h *SocketHandler) Start(ctx context.Context, path string, params map[string]string) (err error) { | ||
81 | // check if we are already running | 97 | // check if we are already running |
82 | if si.running { | 98 | if h.cmgr != nil { |
83 | logger.Printf(logger.ERROR, "Service '%s' already running.\n", si.name) | 99 | logger.Printf(logger.ERROR, "Service '%s' already running.\n", h.name) |
84 | return fmt.Errorf("service already running") | 100 | return fmt.Errorf("service already running") |
85 | } | 101 | } |
86 | 102 | // start connection manager | |
87 | // start channel server | 103 | logger.Printf(logger.INFO, "[%s] Service starting.\n", h.name) |
88 | logger.Printf(logger.INFO, "[%s] Service starting.\n", si.name) | 104 | if h.cmgr, err = NewConnectionManager(ctx, path, params, h.hdlr); err != nil { |
89 | if si.srvc, err = transport.NewChannelServer(spec, si.hdlr); err != nil { | ||
90 | return | 105 | return |
91 | } | 106 | } |
92 | si.running = true | ||
93 | 107 | ||
94 | // handle clients | 108 | // handle client connections |
95 | si.wg.Add(1) | ||
96 | go func() { | 109 | go func() { |
97 | defer si.wg.Done() | ||
98 | loop: | 110 | loop: |
99 | for si.running { | 111 | for { |
100 | select { | 112 | select { |
101 | 113 | ||
102 | // handle incoming connections | 114 | // handle incoming connection |
103 | case in := <-si.hdlr: | 115 | case conn := <-h.hdlr: |
104 | if in == nil { | 116 | // run a new session with context |
105 | logger.Printf(logger.INFO, "[%s] Listener terminated.\n", si.name) | 117 | id := util.NextID() |
106 | break loop | 118 | logger.Printf(logger.INFO, "[%s] Session '%d' started.\n", h.name, id) |
107 | } | 119 | |
108 | switch ch := in.(type) { | 120 | go func() { |
109 | case transport.Channel: | 121 | // serve client on the message channel |
110 | // run a new session with context | 122 | h.srv.ServeClient(ctx, id, conn) |
111 | ctx := NewSessionContext() | 123 | // session is done now. |
112 | sessID := ctx.ID | 124 | logger.Printf(logger.INFO, "[%s] Session with client '%d' ended.\n", h.name, id) |
113 | si.pending[sessID] = ctx | 125 | }() |
114 | logger.Printf(logger.INFO, "[%s] Session '%d' started.\n", si.name, sessID) | 126 | |
115 | 127 | // handle termination | |
116 | go func() { | 128 | case <-ctx.Done(): |
117 | // serve client on the message channel | 129 | logger.Printf(logger.INFO, "[%s] Listener terminated.\n", h.name) |
118 | si.impl.ServeClient(ctx, transport.NewMsgChannel(ch)) | ||
119 | // session is done now. | ||
120 | logger.Printf(logger.INFO, "[%s] Session with client '%d' ended.\n", si.name, sessID) | ||
121 | si.drop <- sessID | ||
122 | }() | ||
123 | } | ||
124 | |||
125 | // handle session removal | ||
126 | case sessID := <-si.drop: | ||
127 | delete(si.pending, sessID) | ||
128 | |||
129 | // handle cancelation signal on listener. | ||
130 | case <-si.ctrl: | ||
131 | break loop | 130 | break loop |
132 | } | 131 | } |
133 | } | 132 | } |
134 | 133 | ||
135 | // terminate pending sessions | ||
136 | for _, ctx := range si.pending { | ||
137 | logger.Printf(logger.DBG, "[%s] Session '%d' closing...\n", si.name, ctx.ID) | ||
138 | ctx.Cancel() | ||
139 | } | ||
140 | |||
141 | // close-down service | 134 | // close-down service |
142 | logger.Printf(logger.INFO, "[%s] Service closing.\n", si.name) | 135 | logger.Printf(logger.INFO, "[%s] Service closing.\n", h.name) |
143 | si.srvc.Close() | 136 | h.cmgr.Close() |
144 | si.running = false | ||
145 | }() | 137 | }() |
146 | 138 | return nil | |
147 | return si.impl.Start(spec) | ||
148 | } | 139 | } |
149 | 140 | ||
150 | // Stop a service | 141 | // Stop socket handler. |
151 | func (si *Impl) Stop() error { | 142 | func (h *SocketHandler) Stop() error { |
152 | if !si.running { | 143 | if h.cmgr == nil { |
153 | logger.Printf(logger.WARN, "Service '%s' not running.\n", si.name) | 144 | logger.Printf(logger.WARN, "Service '%s' not running.\n", h.name) |
154 | return fmt.Errorf("service not running") | 145 | return fmt.Errorf("service not running") |
155 | } | 146 | } |
156 | si.running = false | 147 | logger.Printf(logger.INFO, "[%s] Service terminating.\n", h.name) |
157 | si.ctrl <- true | 148 | h.cmgr.Close() |
158 | logger.Printf(logger.INFO, "[%s] Service terminating.\n", si.name) | 149 | h.cmgr = nil |
159 | 150 | return nil | |
160 | err := si.impl.Stop() | ||
161 | si.wg.Wait() | ||
162 | return err | ||
163 | } | 151 | } |