aboutsummaryrefslogtreecommitdiff
path: root/src/gnunet/service/gns/service.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/gnunet/service/gns/service.go')
-rw-r--r--src/gnunet/service/gns/service.go235
1 files changed, 120 insertions, 115 deletions
diff --git a/src/gnunet/service/gns/service.go b/src/gnunet/service/gns/service.go
index 6fa1eb2..326d831 100644
--- a/src/gnunet/service/gns/service.go
+++ b/src/gnunet/service/gns/service.go
@@ -19,6 +19,7 @@
19package gns 19package gns
20 20
21import ( 21import (
22 "context"
22 "encoding/hex" 23 "encoding/hex"
23 "fmt" 24 "fmt"
24 "io" 25 "io"
@@ -28,8 +29,8 @@ import (
28 "gnunet/enums" 29 "gnunet/enums"
29 "gnunet/message" 30 "gnunet/message"
30 "gnunet/service" 31 "gnunet/service"
32 "gnunet/service/dht/blocks"
31 "gnunet/service/revocation" 33 "gnunet/service/revocation"
32 "gnunet/transport"
33 "gnunet/util" 34 "gnunet/util"
34 35
35 "github.com/bfix/gospel/data" 36 "github.com/bfix/gospel/data"
@@ -64,115 +65,117 @@ func NewService() service.Service {
64 return inst 65 return inst
65} 66}
66 67
67// Start the GNS service
68func (s *Service) Start(spec string) error {
69 return nil
70}
71
72// Stop the GNS service
73func (s *Service) Stop() error {
74 return nil
75}
76
77// ServeClient processes a client channel. 68// ServeClient processes a client channel.
78func (s *Service) ServeClient(ctx *service.SessionContext, mc *transport.MsgChannel) { 69func (s *Service) ServeClient(ctx context.Context, id int, mc *service.Connection) {
79 reqID := 0 70 reqID := 0
80loop: 71 var cancel context.CancelFunc
72 ctx, cancel = context.WithCancel(ctx)
73
81 for { 74 for {
82 // receive next message from client 75 // receive next message from client
83 reqID++ 76 reqID++
84 logger.Printf(logger.DBG, "[gns:%d:%d] Waiting for client request...\n", ctx.ID, reqID) 77 logger.Printf(logger.DBG, "[gns:%d:%d] Waiting for client request...\n", id, reqID)
85 msg, err := mc.Receive(ctx.Signaller()) 78 msg, err := mc.Receive(ctx)
86 if err != nil { 79 if err != nil {
87 if err == io.EOF { 80 if err == io.EOF {
88 logger.Printf(logger.INFO, "[gns:%d:%d] Client channel closed.\n", ctx.ID, reqID) 81 logger.Printf(logger.INFO, "[gns:%d:%d] Client channel closed.\n", id, reqID)
89 } else if err == transport.ErrChannelInterrupted { 82 } else if err == service.ErrConnectionInterrupted {
90 logger.Printf(logger.INFO, "[gns:%d:%d] Service operation interrupted.\n", ctx.ID, reqID) 83 logger.Printf(logger.INFO, "[gns:%d:%d] Service operation interrupted.\n", id, reqID)
91 } else { 84 } else {
92 logger.Printf(logger.ERROR, "[gns:%d:%d] Message-receive failed: %s\n", ctx.ID, reqID, err.Error()) 85 logger.Printf(logger.ERROR, "[gns:%d:%d] Message-receive failed: %s\n", id, reqID, err.Error())
93 } 86 }
94 break loop 87 break
95 } 88 }
96 logger.Printf(logger.INFO, "[gns:%d:%d] Received request: %v\n", ctx.ID, reqID, msg) 89 logger.Printf(logger.INFO, "[gns:%d:%d] Received request: %v\n", id, reqID, msg)
97 90
98 // perform lookup 91 // handle message
99 switch m := msg.(type) { 92 s.HandleMessage(context.WithValue(ctx, "label", fmt.Sprintf(":%d:%d", id, reqID)), msg, mc)
100 case *message.LookupMsg: 93 }
101 //---------------------------------------------------------- 94 // close client connection
102 // GNS_LOOKUP 95 mc.Close()
103 //---------------------------------------------------------- 96
104 97 // cancel all tasks running for this session/connection
105 // perform lookup on block (locally and remote) 98 logger.Printf(logger.INFO, "[gns:%d] Start closing session...\n", id)
106 go func(id int, m *message.LookupMsg) { 99 cancel()
107 logger.Printf(logger.INFO, "[gns:%d:%d] Lookup request received.\n", ctx.ID, id) 100}
108 resp := message.NewGNSLookupResultMsg(m.ID) 101
109 ctx.Add() 102// Handle a single incoming message
110 defer func() { 103func (s *Service) HandleMessage(ctx context.Context, msg message.Message, back service.Responder) bool {
111 // send response 104 // assemble log label
112 if resp != nil { 105 label := ""
113 if err := mc.Send(resp, ctx.Signaller()); err != nil { 106 if v := ctx.Value("label"); v != nil {
114 logger.Printf(logger.ERROR, "[gns:%d:%d] Failed to send response: %s\n", ctx.ID, id, err.Error()) 107 label = v.(string)
115 } 108 }
116 } 109 // perform lookup
117 // go-routine finished 110 switch m := msg.(type) {
118 logger.Printf(logger.DBG, "[gns:%d:%d] Lookup request finished.\n", ctx.ID, id) 111 case *message.LookupMsg:
119 ctx.Remove() 112 //----------------------------------------------------------
120 }() 113 // GNS_LOOKUP
121 114 //----------------------------------------------------------
122 label := m.GetName() 115
123 kind := NewRRTypeList(int(m.Type)) 116 // perform lookup on block (locally and remote)
124 recset, err := s.Resolve(ctx, label, m.Zone, kind, int(m.Options), 0) 117 go func(m *message.LookupMsg) {
125 if err != nil { 118 logger.Printf(logger.INFO, "[gns%s] Lookup request received.\n", label)
126 logger.Printf(logger.ERROR, "[gns:%d:%d] Failed to lookup block: %s\n", ctx.ID, id, err.Error()) 119 resp := message.NewGNSLookupResultMsg(m.ID)
127 if err == transport.ErrChannelInterrupted { 120 defer func() {
128 resp = nil 121 // send response
122 if resp != nil {
123 if err := back.Send(ctx, resp); err != nil {
124 logger.Printf(logger.ERROR, "[gns%s] Failed to send response: %s\n", label, err.Error())
129 } 125 }
126 }
127 // go-routine finished
128 logger.Printf(logger.DBG, "[gns%s] Lookup request finished.\n", label)
129 }()
130
131 label := m.GetName()
132 kind := NewRRTypeList(int(m.Type))
133 recset, err := s.Resolve(ctx, label, m.Zone, kind, int(m.Options), 0)
134 if err != nil {
135 logger.Printf(logger.ERROR, "[gns%s] Failed to lookup block: %s\n", label, err.Error())
136 if err == service.ErrConnectionInterrupted {
137 resp = nil
138 }
139 return
140 }
141 // handle records
142 if recset != nil {
143 logger.Printf(logger.DBG, "[gns%s] Received record set with %d entries\n", label, recset.Count)
144
145 // get records from block
146 if recset.Count == 0 {
147 logger.Printf(logger.WARN, "[gns%s] No records in block\n", label)
130 return 148 return
131 } 149 }
132 // handle records 150 // process records
133 if recset != nil { 151 for i, rec := range recset.Records {
134 logger.Printf(logger.DBG, "[gns:%d:%d] Received record set with %d entries\n", ctx.ID, id, recset.Count) 152 logger.Printf(logger.DBG, "[gns%s] Record #%d: %v\n", label, i, rec)
135 153
136 // get records from block 154 // is this the record type we are looking for?
137 if recset.Count == 0 { 155 if rec.Type == m.Type || int(m.Type) == enums.GNS_TYPE_ANY {
138 logger.Printf(logger.WARN, "[gns:%d:%d] No records in block\n", ctx.ID, id) 156 // add it to the response message
139 return 157 resp.AddRecord(rec)
140 }
141 // process records
142 for i, rec := range recset.Records {
143 logger.Printf(logger.DBG, "[gns:%d:%d] Record #%d: %v\n", ctx.ID, id, i, rec)
144
145 // is this the record type we are looking for?
146 if rec.Type == m.Type || int(m.Type) == enums.GNS_TYPE_ANY {
147 // add it to the response message
148 resp.AddRecord(rec)
149 }
150 } 158 }
151 } 159 }
152 }(reqID, m) 160 }
153 161 }(m)
154 default:
155 //----------------------------------------------------------
156 // UNKNOWN message type received
157 //----------------------------------------------------------
158 logger.Printf(logger.ERROR, "[gns:%d:%d] Unhandled message of type (%d)\n", ctx.ID, reqID, msg.Header().MsgType)
159 break loop
160 }
161 }
162 // close client connection
163 mc.Close()
164 162
165 // cancel all tasks running for this session/connection 163 default:
166 logger.Printf(logger.INFO, "[gns:%d] Start closing session... [%d]\n", ctx.ID, ctx.Waiting()) 164 //----------------------------------------------------------
167 ctx.Cancel() 165 // UNKNOWN message type received
166 //----------------------------------------------------------
167 logger.Printf(logger.ERROR, "[gns%s] Unhandled message of type (%d)\n", label, msg.Header().MsgType)
168 return false
169 }
170 return true
168} 171}
169 172
170//====================================================================== 173//======================================================================
171// Revocationrelated methods 174// Revocation-related methods
172//====================================================================== 175//======================================================================
173 176
174// QueryKeyRevocation checks if a key has been revoked 177// QueryKeyRevocation checks if a key has been revoked
175func (s *Service) QueryKeyRevocation(ctx *service.SessionContext, zkey *crypto.ZoneKey) (valid bool, err error) { 178func (s *Service) QueryKeyRevocation(ctx context.Context, zkey *crypto.ZoneKey) (valid bool, err error) {
176 logger.Printf(logger.DBG, "[gns] QueryKeyRev(%s)...\n", util.EncodeBinaryToString(zkey.Bytes())) 179 logger.Printf(logger.DBG, "[gns] QueryKeyRev(%s)...\n", util.EncodeBinaryToString(zkey.Bytes()))
177 180
178 // assemble request 181 // assemble request
@@ -180,7 +183,7 @@ func (s *Service) QueryKeyRevocation(ctx *service.SessionContext, zkey *crypto.Z
180 183
181 // get response from Revocation service 184 // get response from Revocation service
182 var resp message.Message 185 var resp message.Message
183 if resp, err = service.RequestResponse(ctx, "gns", "Revocation", config.Cfg.Revocation.Endpoint, req); err != nil { 186 if resp, err = service.RequestResponse(ctx, "gns", "Revocation", config.Cfg.Revocation.Service.Socket, req); err != nil {
184 return 187 return
185 } 188 }
186 189
@@ -195,7 +198,7 @@ func (s *Service) QueryKeyRevocation(ctx *service.SessionContext, zkey *crypto.Z
195} 198}
196 199
197// RevokeKey revokes a key with given revocation data 200// RevokeKey revokes a key with given revocation data
198func (s *Service) RevokeKey(ctx *service.SessionContext, rd *revocation.RevData) (success bool, err error) { 201func (s *Service) RevokeKey(ctx context.Context, rd *revocation.RevData) (success bool, err error) {
199 logger.Printf(logger.DBG, "[gns] RevokeKey(%s)...\n", rd.ZoneKeySig.ID()) 202 logger.Printf(logger.DBG, "[gns] RevokeKey(%s)...\n", rd.ZoneKeySig.ID())
200 203
201 // assemble request 204 // assemble request
@@ -206,7 +209,7 @@ func (s *Service) RevokeKey(ctx *service.SessionContext, rd *revocation.RevData)
206 209
207 // get response from Revocation service 210 // get response from Revocation service
208 var resp message.Message 211 var resp message.Message
209 if resp, err = service.RequestResponse(ctx, "gns", "Revocation", config.Cfg.Revocation.Endpoint, req); err != nil { 212 if resp, err = service.RequestResponse(ctx, "gns", "Revocation", config.Cfg.Revocation.Service.Socket, req); err != nil {
210 return 213 return
211 } 214 }
212 215
@@ -225,17 +228,17 @@ func (s *Service) RevokeKey(ctx *service.SessionContext, rd *revocation.RevData)
225//====================================================================== 228//======================================================================
226 229
227// LookupNamecache returns a cached lookup (if available) 230// LookupNamecache returns a cached lookup (if available)
228func (s *Service) LookupNamecache(ctx *service.SessionContext, query *Query) (block *message.Block, err error) { 231func (s *Service) LookupNamecache(ctx context.Context, query *blocks.GNSQuery) (block *blocks.GNSBlock, err error) {
229 logger.Printf(logger.DBG, "[gns] LookupNamecache(%s)...\n", hex.EncodeToString(query.Key.Bits)) 232 logger.Printf(logger.DBG, "[gns] LookupNamecache(%s)...\n", hex.EncodeToString(query.Key().Bits))
230 233
231 // assemble Namecache request 234 // assemble Namecache request
232 req := message.NewNamecacheLookupMsg(query.Key) 235 req := message.NewNamecacheLookupMsg(query.Key())
233 req.ID = uint32(util.NextID()) 236 req.ID = uint32(util.NextID())
234 block = nil 237 block = nil
235 238
236 // get response from Namecache service 239 // get response from Namecache service
237 var resp message.Message 240 var resp message.Message
238 if resp, err = service.RequestResponse(ctx, "gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil { 241 if resp, err = service.RequestResponse(ctx, "gns", "Namecache", config.Cfg.Namecache.Service.Socket, req); err != nil {
239 return 242 return
240 } 243 }
241 244
@@ -250,7 +253,7 @@ func (s *Service) LookupNamecache(ctx *service.SessionContext, query *Query) (bl
250 break 253 break
251 } 254 }
252 // check if block was found 255 // check if block was found
253 if len(m.EncData) == 0 || util.IsNull(m.EncData) { 256 if len(m.EncData) == 0 || util.IsAll(m.EncData, 0) {
254 logger.Println(logger.DBG, "[gns] block not found in namecache") 257 logger.Println(logger.DBG, "[gns] block not found in namecache")
255 break 258 break
256 } 259 }
@@ -262,21 +265,21 @@ func (s *Service) LookupNamecache(ctx *service.SessionContext, query *Query) (bl
262 } 265 }
263 266
264 // assemble GNSBlock from message 267 // assemble GNSBlock from message
265 block = new(message.Block) 268 block = new(blocks.GNSBlock)
266 block.DerivedKeySig = m.DerivedKeySig 269 block.DerivedKeySig = m.DerivedKeySig
267 sb := new(message.SignedBlockData) 270 sb := new(blocks.SignedGNSBlockData)
268 sb.Purpose = new(crypto.SignaturePurpose) 271 sb.Purpose = new(crypto.SignaturePurpose)
269 sb.Purpose.Purpose = enums.SIG_GNS_RECORD_SIGN 272 sb.Purpose.Purpose = enums.SIG_GNS_RECORD_SIGN
270 sb.Purpose.Size = uint32(16 + len(m.EncData)) 273 sb.Purpose.Size = uint32(16 + len(m.EncData))
271 sb.Expire = m.Expire 274 sb.Expire = m.Expire
272 sb.EncData = m.EncData 275 sb.Data = m.EncData
273 block.Block = sb 276 block.Body = sb
274 277
275 // verify and decrypt block 278 // verify and decrypt block
276 if err = block.Verify(query.Zone, query.Label); err != nil { 279 if err = query.Verify(block); err != nil {
277 break 280 break
278 } 281 }
279 if err = block.Decrypt(query.Zone, query.Label); err != nil { 282 if err = query.Decrypt(block); err != nil {
280 break 283 break
281 } 284 }
282 default: 285 default:
@@ -287,7 +290,7 @@ func (s *Service) LookupNamecache(ctx *service.SessionContext, query *Query) (bl
287} 290}
288 291
289// StoreNamecache stores a lookup in the local namecache. 292// StoreNamecache stores a lookup in the local namecache.
290func (s *Service) StoreNamecache(ctx *service.SessionContext, block *message.Block) (err error) { 293func (s *Service) StoreNamecache(ctx context.Context, query *blocks.GNSQuery, block *blocks.GNSBlock) (err error) {
291 logger.Println(logger.DBG, "[gns] StoreNamecache()...") 294 logger.Println(logger.DBG, "[gns] StoreNamecache()...")
292 295
293 // assemble Namecache request 296 // assemble Namecache request
@@ -296,7 +299,7 @@ func (s *Service) StoreNamecache(ctx *service.SessionContext, block *message.Blo
296 299
297 // get response from Namecache service 300 // get response from Namecache service
298 var resp message.Message 301 var resp message.Message
299 if resp, err = service.RequestResponse(ctx, "gns", "Namecache", config.Cfg.Namecache.Endpoint, req); err != nil { 302 if resp, err = service.RequestResponse(ctx, "gns", "Namecache", config.Cfg.Namecache.Service.Socket, req); err != nil {
300 return 303 return
301 } 304 }
302 305
@@ -327,13 +330,13 @@ func (s *Service) StoreNamecache(ctx *service.SessionContext, block *message.Blo
327//====================================================================== 330//======================================================================
328 331
329// LookupDHT gets a GNS block from the DHT for the given query key. 332// LookupDHT gets a GNS block from the DHT for the given query key.
330func (s *Service) LookupDHT(ctx *service.SessionContext, query *Query) (block *message.Block, err error) { 333func (s *Service) LookupDHT(ctx context.Context, query blocks.Query) (block blocks.Block, err error) {
331 logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", hex.EncodeToString(query.Key.Bits)) 334 logger.Printf(logger.DBG, "[gns] LookupDHT(%s)...\n", hex.EncodeToString(query.Key().Bits))
332 block = nil 335 block = nil
333 336
334 // client-connect to the DHT service 337 // client-connect to the DHT service
335 logger.Println(logger.DBG, "[gns] Connecting to DHT service...") 338 logger.Println(logger.DBG, "[gns] Connecting to DHT service...")
336 cl, err := service.NewClient(config.Cfg.DHT.Endpoint) 339 cl, err := service.NewClient(ctx, config.Cfg.DHT.Service.Socket)
337 if err != nil { 340 if err != nil {
338 return nil, err 341 return nil, err
339 } 342 }
@@ -360,7 +363,7 @@ func (s *Service) LookupDHT(ctx *service.SessionContext, query *Query) (block *m
360 ) 363 )
361 364
362 // send DHT GET request and wait for response 365 // send DHT GET request and wait for response
363 reqGet := message.NewDHTClientGetMsg(query.Key) 366 reqGet := message.NewDHTClientGetMsg(query.Key())
364 reqGet.ID = uint64(util.NextID()) 367 reqGet.ID = uint64(util.NextID())
365 reqGet.ReplLevel = uint32(enums.DHT_GNS_REPLICATION_LEVEL) 368 reqGet.ReplLevel = uint32(enums.DHT_GNS_REPLICATION_LEVEL)
366 reqGet.Type = uint32(enums.BLOCK_TYPE_GNS_NAMERECORD) 369 reqGet.Type = uint32(enums.BLOCK_TYPE_GNS_NAMERECORD)
@@ -368,16 +371,16 @@ func (s *Service) LookupDHT(ctx *service.SessionContext, query *Query) (block *m
368 371
369 if err = interact(reqGet, true); err != nil { 372 if err = interact(reqGet, true); err != nil {
370 // check for aborted remote lookup: we need to cancel the query 373 // check for aborted remote lookup: we need to cancel the query
371 if err == transport.ErrChannelInterrupted { 374 if err == service.ErrConnectionInterrupted {
372 logger.Println(logger.WARN, "[gns] remote Lookup aborted -- cleaning up.") 375 logger.Println(logger.WARN, "[gns] remote Lookup aborted -- cleaning up.")
373 376
374 // send DHT GET_STOP request and terminate 377 // send DHT GET_STOP request and terminate
375 reqStop := message.NewDHTClientGetStopMsg(query.Key) 378 reqStop := message.NewDHTClientGetStopMsg(query.Key())
376 reqStop.ID = reqGet.ID 379 reqStop.ID = reqGet.ID
377 if err = interact(reqStop, false); err != nil { 380 if err = interact(reqStop, false); err != nil {
378 logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", err.Error()) 381 logger.Printf(logger.ERROR, "[gns] remote Lookup abort failed: %s\n", err.Error())
379 } 382 }
380 return nil, transport.ErrChannelInterrupted 383 return nil, service.ErrConnectionInterrupted
381 } 384 }
382 } 385 }
383 386
@@ -407,22 +410,24 @@ func (s *Service) LookupDHT(ctx *service.SessionContext, query *Query) (block *m
407 } 410 }
408 411
409 // get GNSBlock from message 412 // get GNSBlock from message
410 block = message.NewBlock() 413 qGNS := query.(*blocks.GNSQuery)
414 block = new(blocks.GNSBlock)
411 if err = data.Unmarshal(block, m.Data); err != nil { 415 if err = data.Unmarshal(block, m.Data); err != nil {
412 logger.Printf(logger.ERROR, "[gns] can't read GNS block: %s\n", err.Error()) 416 logger.Printf(logger.ERROR, "[gns] can't read GNS block: %s\n", err.Error())
413 break 417 break
414 } 418 }
419
415 // verify and decrypt block 420 // verify and decrypt block
416 if err = block.Verify(query.Zone, query.Label); err != nil { 421 if err = qGNS.Verify(block); err != nil {
417 break 422 break
418 } 423 }
419 if err = block.Decrypt(query.Zone, query.Label); err != nil { 424 if err = qGNS.Decrypt(block); err != nil {
420 break 425 break
421 } 426 }
422 427
423 // we got a result from DHT that was not in the namecache, 428 // we got a result from DHT that was not in the namecache,
424 // so store it there now. 429 // so store it there now.
425 if err = s.StoreNamecache(ctx, block); err != nil { 430 if err = s.StoreNamecache(ctx, qGNS, block.(*blocks.GNSBlock)); err != nil {
426 logger.Printf(logger.ERROR, "[gns] can't store block in Namecache: %s\n", err.Error()) 431 logger.Printf(logger.ERROR, "[gns] can't store block in Namecache: %s\n", err.Error())
427 } 432 }
428 } 433 }