diff options
Diffstat (limited to 'src/gnunet/service/connection.go')
-rw-r--r-- | src/gnunet/service/connection.go | 280 |
1 files changed, 280 insertions, 0 deletions
diff --git a/src/gnunet/service/connection.go b/src/gnunet/service/connection.go new file mode 100644 index 0000000..1c690c5 --- /dev/null +++ b/src/gnunet/service/connection.go | |||
@@ -0,0 +1,280 @@ | |||
1 | // This file is part of gnunet-go, a GNUnet-implementation in Golang. | ||
2 | // Copyright (C) 2019-2022 Bernd Fix >Y< | ||
3 | // | ||
4 | // gnunet-go is free software: you can redistribute it and/or modify it | ||
5 | // under the terms of the GNU Affero General Public License as published | ||
6 | // by the Free Software Foundation, either version 3 of the License, | ||
7 | // or (at your option) any later version. | ||
8 | // | ||
9 | // gnunet-go is distributed in the hope that it will be useful, but | ||
10 | // WITHOUT ANY WARRANTY; without even the implied warranty of | ||
11 | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
12 | // Affero General Public License for more details. | ||
13 | // | ||
14 | // You should have received a copy of the GNU Affero General Public License | ||
15 | // along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
16 | // | ||
17 | // SPDX-License-Identifier: AGPL3.0-or-later | ||
18 | |||
19 | package service | ||
20 | |||
21 | import ( | ||
22 | "context" | ||
23 | "errors" | ||
24 | "fmt" | ||
25 | "gnunet/message" | ||
26 | "net" | ||
27 | "os" | ||
28 | "strconv" | ||
29 | |||
30 | "github.com/bfix/gospel/data" | ||
31 | "github.com/bfix/gospel/logger" | ||
32 | ) | ||
33 | |||
34 | // Error codes | ||
35 | var ( | ||
36 | ErrConnectionNotOpened = errors.New("channel not opened") | ||
37 | ErrConnectionInterrupted = errors.New("channel interrupted") | ||
38 | ) | ||
39 | |||
40 | //====================================================================== | ||
41 | |||
42 | // Connection is a channel for GNUnet message exchange (send/receive) | ||
43 | // based on Unix domain sockets. It is used locally by services and | ||
44 | // clients in the standard GNUnet environment. | ||
45 | type Connection struct { | ||
46 | path string // file name of Unix socket | ||
47 | conn net.Conn // associated connection | ||
48 | buf []byte // read/write buffer | ||
49 | } | ||
50 | |||
51 | // NewConnection creates a new connection to a socket with given path. | ||
52 | // This is used by clients to connect to a service. | ||
53 | func NewConnection(ctx context.Context, path string) (s *Connection, err error) { | ||
54 | var d net.Dialer | ||
55 | s = new(Connection) | ||
56 | s.path = path | ||
57 | s.buf = make([]byte, 65536) | ||
58 | s.conn, err = d.DialContext(ctx, "unix", path) | ||
59 | return | ||
60 | } | ||
61 | |||
62 | // Close a socket connection | ||
63 | func (s *Connection) Close() error { | ||
64 | if s.conn != nil { | ||
65 | rc := s.conn.Close() | ||
66 | s.conn = nil | ||
67 | return rc | ||
68 | } | ||
69 | return ErrConnectionNotOpened | ||
70 | } | ||
71 | |||
72 | // Send a GNUnet message over a socket. | ||
73 | func (s *Connection) Send(ctx context.Context, msg message.Message) error { | ||
74 | // convert message to binary data | ||
75 | data, err := data.Marshal(msg) | ||
76 | if err != nil { | ||
77 | return err | ||
78 | } | ||
79 | // check message header size and packet size | ||
80 | mh, err := message.GetMsgHeader(data) | ||
81 | if err != nil { | ||
82 | return err | ||
83 | } | ||
84 | if len(data) != int(mh.MsgSize) { | ||
85 | return errors.New("send: message size mismatch") | ||
86 | } | ||
87 | |||
88 | // send packet | ||
89 | n, err := s.write(ctx, data) | ||
90 | if err != nil { | ||
91 | return err | ||
92 | } | ||
93 | if n != len(data) { | ||
94 | return errors.New("incomplete send") | ||
95 | } | ||
96 | return nil | ||
97 | } | ||
98 | |||
99 | // Receive GNUnet messages from socket. | ||
100 | func (s *Connection) Receive(ctx context.Context) (message.Message, error) { | ||
101 | // get bytes from socket | ||
102 | get := func(pos, count int) error { | ||
103 | n, err := s.read(ctx, s.buf[pos:pos+count]) | ||
104 | if err != nil { | ||
105 | return err | ||
106 | } | ||
107 | if n != count { | ||
108 | return errors.New("not enough bytes on network") | ||
109 | } | ||
110 | return nil | ||
111 | } | ||
112 | // read header first | ||
113 | if err := get(0, 4); err != nil { | ||
114 | return nil, err | ||
115 | } | ||
116 | mh, err := message.GetMsgHeader(s.buf[:4]) | ||
117 | if err != nil { | ||
118 | return nil, err | ||
119 | } | ||
120 | // get rest of message | ||
121 | if err := get(4, int(mh.MsgSize)-4); err != nil { | ||
122 | return nil, err | ||
123 | } | ||
124 | msg, err := message.NewEmptyMessage(mh.MsgType) | ||
125 | if err != nil { | ||
126 | return nil, err | ||
127 | } | ||
128 | if msg == nil { | ||
129 | return nil, fmt.Errorf("message{%d} is nil", mh.MsgType) | ||
130 | } | ||
131 | if err = data.Unmarshal(msg, s.buf[:mh.MsgSize]); err != nil { | ||
132 | return nil, err | ||
133 | } | ||
134 | return msg, nil | ||
135 | } | ||
136 | |||
137 | //---------------------------------------------------------------------- | ||
138 | // internal methods | ||
139 | //---------------------------------------------------------------------- | ||
140 | |||
141 | // result of read/write operations on sockets. | ||
142 | type result struct { | ||
143 | n int // number of bytes read/written | ||
144 | err error // error (or nil) | ||
145 | } | ||
146 | |||
147 | // Read bytes from a socket into buffer: Returns the number of read | ||
148 | // bytes and an error code. Only works on open channels ;) | ||
149 | func (s *Connection) read(ctx context.Context, buf []byte) (int, error) { | ||
150 | // check if the channel is open | ||
151 | if s.conn == nil { | ||
152 | return 0, ErrConnectionNotOpened | ||
153 | } | ||
154 | // perform read operation | ||
155 | ch := make(chan *result) | ||
156 | go func() { | ||
157 | n, err := s.conn.Read(buf) | ||
158 | ch <- &result{n, err} | ||
159 | }() | ||
160 | for { | ||
161 | select { | ||
162 | // terminate on request | ||
163 | case <-ctx.Done(): | ||
164 | return 0, ErrConnectionInterrupted | ||
165 | |||
166 | // handle result of read operation | ||
167 | case res := <-ch: | ||
168 | return res.n, res.err | ||
169 | } | ||
170 | } | ||
171 | } | ||
172 | |||
173 | // Write buffer to socket and returns the number of bytes written and an | ||
174 | // optional error code. | ||
175 | func (s *Connection) write(ctx context.Context, buf []byte) (int, error) { | ||
176 | // check if we have an open socket to write to. | ||
177 | if s.conn == nil { | ||
178 | return 0, ErrConnectionNotOpened | ||
179 | } | ||
180 | // perform write operation | ||
181 | ch := make(chan *result) | ||
182 | go func() { | ||
183 | n, err := s.conn.Write(buf) | ||
184 | ch <- &result{n, err} | ||
185 | }() | ||
186 | for { | ||
187 | select { | ||
188 | // handle terminate command | ||
189 | case <-ctx.Done(): | ||
190 | return 0, ErrConnectionInterrupted | ||
191 | |||
192 | // handle result of write operation | ||
193 | case res := <-ch: | ||
194 | return res.n, res.err | ||
195 | } | ||
196 | } | ||
197 | } | ||
198 | |||
199 | //====================================================================== | ||
200 | |||
201 | // ConnectionManager to handle client connections on a socket. | ||
202 | type ConnectionManager struct { | ||
203 | listener net.Listener // reference to listener object | ||
204 | running bool // server running? | ||
205 | } | ||
206 | |||
207 | // NewConnectionManager creates a new socket connection manager. Incoming | ||
208 | // connections from clients are dispatched to a handler channel. | ||
209 | func NewConnectionManager( | ||
210 | ctx context.Context, // execution context | ||
211 | path string, // socket file name | ||
212 | params map[string]string, // connection parameters | ||
213 | hdlr chan *Connection, // handler for incoming connections | ||
214 | ) (cs *ConnectionManager, err error) { | ||
215 | |||
216 | // instantiate channel server | ||
217 | cs = &ConnectionManager{ | ||
218 | listener: nil, | ||
219 | running: false, | ||
220 | } | ||
221 | // create listener | ||
222 | var lc net.ListenConfig | ||
223 | if cs.listener, err = lc.Listen(ctx, "unix", path); err != nil { | ||
224 | return | ||
225 | } | ||
226 | // handle additional parameters | ||
227 | if params != nil { | ||
228 | for key, value := range params { | ||
229 | switch key { | ||
230 | case "perm": // set permissions on 'unix' | ||
231 | if perm, err := strconv.ParseInt(value, 8, 32); err == nil { | ||
232 | if err := os.Chmod(path, os.FileMode(perm)); err != nil { | ||
233 | logger.Printf( | ||
234 | logger.ERROR, | ||
235 | "MsgChannelServer: Failed to set permissions %s on %s: %s\n", | ||
236 | path, value, err.Error()) | ||
237 | |||
238 | } | ||
239 | } else { | ||
240 | logger.Printf( | ||
241 | logger.ERROR, | ||
242 | "MsgChannelServer: Invalid permissions '%s'\n", | ||
243 | value) | ||
244 | } | ||
245 | } | ||
246 | } | ||
247 | } | ||
248 | // run go routine to handle channel requests from clients | ||
249 | cs.running = true | ||
250 | go func() { | ||
251 | for cs.running { | ||
252 | conn, err := cs.listener.Accept() | ||
253 | if err != nil { | ||
254 | break | ||
255 | } | ||
256 | // handle connection | ||
257 | c := &Connection{ | ||
258 | conn: conn, | ||
259 | path: path, | ||
260 | buf: make([]byte, 65536), | ||
261 | } | ||
262 | hdlr <- c | ||
263 | } | ||
264 | if cs.listener != nil { | ||
265 | cs.listener.Close() | ||
266 | } | ||
267 | }() | ||
268 | return cs, nil | ||
269 | } | ||
270 | |||
271 | // Close a network channel server (= stop the server) | ||
272 | func (s *ConnectionManager) Close() error { | ||
273 | s.running = false | ||
274 | if s.listener != nil { | ||
275 | err := s.listener.Close() | ||
276 | s.listener = nil | ||
277 | return err | ||
278 | } | ||
279 | return nil | ||
280 | } | ||