aboutsummaryrefslogtreecommitdiff
path: root/src/gnunet/service/connection.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/gnunet/service/connection.go')
-rw-r--r--src/gnunet/service/connection.go280
1 files changed, 280 insertions, 0 deletions
diff --git a/src/gnunet/service/connection.go b/src/gnunet/service/connection.go
new file mode 100644
index 0000000..1c690c5
--- /dev/null
+++ b/src/gnunet/service/connection.go
@@ -0,0 +1,280 @@
1// This file is part of gnunet-go, a GNUnet-implementation in Golang.
2// Copyright (C) 2019-2022 Bernd Fix >Y<
3//
4// gnunet-go is free software: you can redistribute it and/or modify it
5// under the terms of the GNU Affero General Public License as published
6// by the Free Software Foundation, either version 3 of the License,
7// or (at your option) any later version.
8//
9// gnunet-go is distributed in the hope that it will be useful, but
10// WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12// Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program. If not, see <http://www.gnu.org/licenses/>.
16//
17// SPDX-License-Identifier: AGPL3.0-or-later
18
19package service
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "gnunet/message"
26 "net"
27 "os"
28 "strconv"
29
30 "github.com/bfix/gospel/data"
31 "github.com/bfix/gospel/logger"
32)
33
34// Error codes
35var (
36 ErrConnectionNotOpened = errors.New("channel not opened")
37 ErrConnectionInterrupted = errors.New("channel interrupted")
38)
39
40//======================================================================
41
42// Connection is a channel for GNUnet message exchange (send/receive)
43// based on Unix domain sockets. It is used locally by services and
44// clients in the standard GNUnet environment.
45type Connection struct {
46 path string // file name of Unix socket
47 conn net.Conn // associated connection
48 buf []byte // read/write buffer
49}
50
51// NewConnection creates a new connection to a socket with given path.
52// This is used by clients to connect to a service.
53func NewConnection(ctx context.Context, path string) (s *Connection, err error) {
54 var d net.Dialer
55 s = new(Connection)
56 s.path = path
57 s.buf = make([]byte, 65536)
58 s.conn, err = d.DialContext(ctx, "unix", path)
59 return
60}
61
62// Close a socket connection
63func (s *Connection) Close() error {
64 if s.conn != nil {
65 rc := s.conn.Close()
66 s.conn = nil
67 return rc
68 }
69 return ErrConnectionNotOpened
70}
71
72// Send a GNUnet message over a socket.
73func (s *Connection) Send(ctx context.Context, msg message.Message) error {
74 // convert message to binary data
75 data, err := data.Marshal(msg)
76 if err != nil {
77 return err
78 }
79 // check message header size and packet size
80 mh, err := message.GetMsgHeader(data)
81 if err != nil {
82 return err
83 }
84 if len(data) != int(mh.MsgSize) {
85 return errors.New("send: message size mismatch")
86 }
87
88 // send packet
89 n, err := s.write(ctx, data)
90 if err != nil {
91 return err
92 }
93 if n != len(data) {
94 return errors.New("incomplete send")
95 }
96 return nil
97}
98
99// Receive GNUnet messages from socket.
100func (s *Connection) Receive(ctx context.Context) (message.Message, error) {
101 // get bytes from socket
102 get := func(pos, count int) error {
103 n, err := s.read(ctx, s.buf[pos:pos+count])
104 if err != nil {
105 return err
106 }
107 if n != count {
108 return errors.New("not enough bytes on network")
109 }
110 return nil
111 }
112 // read header first
113 if err := get(0, 4); err != nil {
114 return nil, err
115 }
116 mh, err := message.GetMsgHeader(s.buf[:4])
117 if err != nil {
118 return nil, err
119 }
120 // get rest of message
121 if err := get(4, int(mh.MsgSize)-4); err != nil {
122 return nil, err
123 }
124 msg, err := message.NewEmptyMessage(mh.MsgType)
125 if err != nil {
126 return nil, err
127 }
128 if msg == nil {
129 return nil, fmt.Errorf("message{%d} is nil", mh.MsgType)
130 }
131 if err = data.Unmarshal(msg, s.buf[:mh.MsgSize]); err != nil {
132 return nil, err
133 }
134 return msg, nil
135}
136
137//----------------------------------------------------------------------
138// internal methods
139//----------------------------------------------------------------------
140
141// result of read/write operations on sockets.
142type result struct {
143 n int // number of bytes read/written
144 err error // error (or nil)
145}
146
147// Read bytes from a socket into buffer: Returns the number of read
148// bytes and an error code. Only works on open channels ;)
149func (s *Connection) read(ctx context.Context, buf []byte) (int, error) {
150 // check if the channel is open
151 if s.conn == nil {
152 return 0, ErrConnectionNotOpened
153 }
154 // perform read operation
155 ch := make(chan *result)
156 go func() {
157 n, err := s.conn.Read(buf)
158 ch <- &result{n, err}
159 }()
160 for {
161 select {
162 // terminate on request
163 case <-ctx.Done():
164 return 0, ErrConnectionInterrupted
165
166 // handle result of read operation
167 case res := <-ch:
168 return res.n, res.err
169 }
170 }
171}
172
173// Write buffer to socket and returns the number of bytes written and an
174// optional error code.
175func (s *Connection) write(ctx context.Context, buf []byte) (int, error) {
176 // check if we have an open socket to write to.
177 if s.conn == nil {
178 return 0, ErrConnectionNotOpened
179 }
180 // perform write operation
181 ch := make(chan *result)
182 go func() {
183 n, err := s.conn.Write(buf)
184 ch <- &result{n, err}
185 }()
186 for {
187 select {
188 // handle terminate command
189 case <-ctx.Done():
190 return 0, ErrConnectionInterrupted
191
192 // handle result of write operation
193 case res := <-ch:
194 return res.n, res.err
195 }
196 }
197}
198
199//======================================================================
200
201// ConnectionManager to handle client connections on a socket.
202type ConnectionManager struct {
203 listener net.Listener // reference to listener object
204 running bool // server running?
205}
206
207// NewConnectionManager creates a new socket connection manager. Incoming
208// connections from clients are dispatched to a handler channel.
209func NewConnectionManager(
210 ctx context.Context, // execution context
211 path string, // socket file name
212 params map[string]string, // connection parameters
213 hdlr chan *Connection, // handler for incoming connections
214) (cs *ConnectionManager, err error) {
215
216 // instantiate channel server
217 cs = &ConnectionManager{
218 listener: nil,
219 running: false,
220 }
221 // create listener
222 var lc net.ListenConfig
223 if cs.listener, err = lc.Listen(ctx, "unix", path); err != nil {
224 return
225 }
226 // handle additional parameters
227 if params != nil {
228 for key, value := range params {
229 switch key {
230 case "perm": // set permissions on 'unix'
231 if perm, err := strconv.ParseInt(value, 8, 32); err == nil {
232 if err := os.Chmod(path, os.FileMode(perm)); err != nil {
233 logger.Printf(
234 logger.ERROR,
235 "MsgChannelServer: Failed to set permissions %s on %s: %s\n",
236 path, value, err.Error())
237
238 }
239 } else {
240 logger.Printf(
241 logger.ERROR,
242 "MsgChannelServer: Invalid permissions '%s'\n",
243 value)
244 }
245 }
246 }
247 }
248 // run go routine to handle channel requests from clients
249 cs.running = true
250 go func() {
251 for cs.running {
252 conn, err := cs.listener.Accept()
253 if err != nil {
254 break
255 }
256 // handle connection
257 c := &Connection{
258 conn: conn,
259 path: path,
260 buf: make([]byte, 65536),
261 }
262 hdlr <- c
263 }
264 if cs.listener != nil {
265 cs.listener.Close()
266 }
267 }()
268 return cs, nil
269}
270
271// Close a network channel server (= stop the server)
272func (s *ConnectionManager) Close() error {
273 s.running = false
274 if s.listener != nil {
275 err := s.listener.Close()
276 s.listener = nil
277 return err
278 }
279 return nil
280}