aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cadet_server.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-02-17 11:06:15 +0100
committerChristian Grothoff <christian@grothoff.org>2017-02-17 11:06:15 +0100
commit9727e5e53721dace7abbcc5bcd28c838af4291cc (patch)
treeca32ed19cf0d4129d3497261531aa40a19599280 /src/fs/gnunet-service-fs_cadet_server.c
parentc793bffc39fe1445616c9d0cb071d62575dea217 (diff)
downloadgnunet-9727e5e53721dace7abbcc5bcd28c838af4291cc.tar.gz
gnunet-9727e5e53721dace7abbcc5bcd28c838af4291cc.zip
convert to new CADET API, not working due to CADET-API internal bugs
Diffstat (limited to 'src/fs/gnunet-service-fs_cadet_server.c')
-rw-r--r--src/fs/gnunet-service-fs_cadet_server.c286
1 files changed, 115 insertions, 171 deletions
diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c
index ac86537c3..0a72a8279 100644
--- a/src/fs/gnunet-service-fs_cadet_server.c
+++ b/src/fs/gnunet-service-fs_cadet_server.c
@@ -124,9 +124,9 @@ struct CadetClient
124 124
125 125
126/** 126/**
127 * Listen channel for incoming requests. 127 * Listen port for incoming requests.
128 */ 128 */
129static struct GNUNET_CADET_Handle *listen_channel; 129static struct GNUNET_CADET_Port *cadet_port;
130 130
131/** 131/**
132 * Head of DLL of cadet clients. 132 * Head of DLL of cadet clients.
@@ -188,121 +188,29 @@ refresh_timeout_task (struct CadetClient *sc)
188 188
189 189
190/** 190/**
191 * We're done handling a request from a client, read the next one. 191 * Check if we are done with the write queue, and if so tell CADET
192 * that we are ready to read more.
192 * 193 *
193 * @param sc client to continue reading requests from 194 * @param cls where to process the write queue
194 */ 195 */
195static void 196static void
196continue_reading (struct CadetClient *sc) 197continue_writing (void *cls)
197{
198 refresh_timeout_task (sc);
199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
200 "Finished processing cadet request from client %p, ready to receive the next one\n",
201 sc);
202 GNUNET_CADET_receive_done (sc->channel);
203}
204
205
206/**
207 * Transmit the next entry from the write queue.
208 *
209 * @param sc where to process the write queue
210 */
211static void
212continue_writing (struct CadetClient *sc);
213
214
215/**
216 * Send a reply now, cadet is ready.
217 *
218 * @param cls closure with the `struct CadetClient` which sent the query
219 * @param size number of bytes available in @a buf
220 * @param buf where to write the message
221 * @return number of bytes written to @a buf
222 */
223static size_t
224write_continuation (void *cls,
225 size_t size,
226 void *buf)
227{ 198{
228 struct CadetClient *sc = cls; 199 struct CadetClient *sc = cls;
229 struct GNUNET_CADET_Channel *tun; 200 struct GNUNET_MQ_Handle *mq;
230 struct WriteQueueItem *wqi;
231 size_t ret;
232
233 sc->wh = NULL;
234 if (NULL == (wqi = sc->wqi_head))
235 {
236 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
237 "Write queue empty, reading more requests\n");
238 return 0;
239 }
240 if ( (0 == size) ||
241 (size < wqi->msize) )
242 {
243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244 "Transmission of reply failed, terminating cadet\n");
245 tun = sc->channel;
246 sc->channel = NULL;
247 GNUNET_CADET_channel_destroy (tun);
248 return 0;
249 }
250 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
251 sc->wqi_tail,
252 wqi);
253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
254 "Transmitted %u byte reply via cadet to %p\n",
255 (unsigned int) size,
256 sc);
257 GNUNET_STATISTICS_update (GSF_stats,
258 gettext_noop ("# Blocks transferred via cadet"), 1,
259 GNUNET_NO);
260 ret = wqi->msize;
261 GNUNET_memcpy (buf, &wqi[1], ret);
262 GNUNET_free (wqi);
263 continue_writing (sc);
264 return ret;
265}
266
267
268/**
269 * Transmit the next entry from the write queue.
270 *
271 * @param sc where to process the write queue
272 */
273static void
274continue_writing (struct CadetClient *sc)
275{
276 struct WriteQueueItem *wqi;
277 struct GNUNET_CADET_Channel *tun;
278 201
279 if (NULL != sc->wh) 202 mq = GNUNET_CADET_get_mq (sc->channel);
203 if (0 != GNUNET_MQ_get_length (mq))
280 { 204 {
281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 205 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
282 "Write pending, waiting for it to complete\n"); 206 "Write pending, waiting for it to complete\n");
283 return; /* write already pending */
284 }
285 if (NULL == (wqi = sc->wqi_head))
286 {
287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
288 "Write queue empty, reading more requests\n");
289 continue_reading (sc);
290 return;
291 }
292 sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO,
293 GNUNET_TIME_UNIT_FOREVER_REL,
294 wqi->msize,
295 &write_continuation,
296 sc);
297 if (NULL == sc->wh)
298 {
299 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
300 "Write failed; terminating cadet\n");
301 tun = sc->channel;
302 sc->channel = NULL;
303 GNUNET_CADET_channel_destroy (tun);
304 return; 207 return;
305 } 208 }
209 refresh_timeout_task (sc);
210 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
211 "Finished processing cadet request from client %p, ready to receive the next one\n",
212 sc);
213 GNUNET_CADET_receive_done (sc->channel);
306} 214}
307 215
308 216
@@ -333,7 +241,7 @@ handle_datastore_reply (void *cls,
333{ 241{
334 struct CadetClient *sc = cls; 242 struct CadetClient *sc = cls;
335 size_t msize = size + sizeof (struct CadetReplyMessage); 243 size_t msize = size + sizeof (struct CadetReplyMessage);
336 struct WriteQueueItem *wqi; 244 struct GNUNET_MQ_Envelope *env;
337 struct CadetReplyMessage *srm; 245 struct CadetReplyMessage *srm;
338 246
339 sc->qe = NULL; 247 sc->qe = NULL;
@@ -357,7 +265,8 @@ handle_datastore_reply (void *cls,
357 GNUNET_h2s (key)); 265 GNUNET_h2s (key));
358 } 266 }
359 GNUNET_STATISTICS_update (GSF_stats, 267 GNUNET_STATISTICS_update (GSF_stats,
360 gettext_noop ("# queries received via CADET not answered"), 1, 268 gettext_noop ("# queries received via CADET not answered"),
269 1,
361 GNUNET_NO); 270 GNUNET_NO);
362 continue_writing (sc); 271 continue_writing (sc);
363 return; 272 return;
@@ -369,9 +278,13 @@ handle_datastore_reply (void *cls,
369 GNUNET_h2s (key)); 278 GNUNET_h2s (key));
370 if (GNUNET_OK != 279 if (GNUNET_OK !=
371 GNUNET_FS_handle_on_demand_block (key, 280 GNUNET_FS_handle_on_demand_block (key,
372 size, data, type, 281 size,
373 priority, anonymity, 282 data,
374 expiration, uid, 283 type,
284 priority,
285 anonymity,
286 expiration,
287 uid,
375 &handle_datastore_reply, 288 &handle_datastore_reply,
376 sc)) 289 sc))
377 { 290 {
@@ -394,19 +307,23 @@ handle_datastore_reply (void *cls,
394 (unsigned int) type, 307 (unsigned int) type,
395 GNUNET_h2s (key), 308 GNUNET_h2s (key),
396 sc); 309 sc);
397 wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize); 310 env = GNUNET_MQ_msg_extra (srm,
398 wqi->msize = msize; 311 size,
399 srm = (struct CadetReplyMessage *) &wqi[1]; 312 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
400 srm->header.size = htons ((uint16_t) msize);
401 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
402 srm->type = htonl (type); 313 srm->type = htonl (type);
403 srm->expiration = GNUNET_TIME_absolute_hton (expiration); 314 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
404 GNUNET_memcpy (&srm[1], data, size); 315 GNUNET_memcpy (&srm[1],
405 sc->reply_size = msize; 316 data,
406 GNUNET_CONTAINER_DLL_insert (sc->wqi_head, 317 size);
407 sc->wqi_tail, 318 GNUNET_MQ_notify_sent (env,
408 wqi); 319 &continue_writing,
409 continue_writing (sc); 320 sc);
321 GNUNET_STATISTICS_update (GSF_stats,
322 gettext_noop ("# Blocks transferred via cadet"),
323 1,
324 GNUNET_NO);
325 GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel),
326 env);
410} 327}
411 328
412 329
@@ -414,30 +331,22 @@ handle_datastore_reply (void *cls,
414 * Functions with this signature are called whenever a 331 * Functions with this signature are called whenever a
415 * complete query message is received. 332 * complete query message is received.
416 * 333 *
417 * Do not call #GNUNET_SERVER_mst_destroy() in callback
418 *
419 * @param cls closure with the `struct CadetClient` 334 * @param cls closure with the `struct CadetClient`
420 * @param channel channel handle 335 * @param sqm the actual message
421 * @param channel_ctx channel context
422 * @param message the actual message
423 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
424 */ 336 */
425static int 337static void
426request_cb (void *cls, 338handle_request (void *cls,
427 struct GNUNET_CADET_Channel *channel, 339 const struct CadetQueryMessage *sqm)
428 void **channel_ctx,
429 const struct GNUNET_MessageHeader *message)
430{ 340{
431 struct CadetClient *sc = *channel_ctx; 341 struct CadetClient *sc = cls;
432 const struct CadetQueryMessage *sqm;
433 342
434 sqm = (const struct CadetQueryMessage *) message;
435 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 343 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
436 "Received query for `%s' via cadet from client %p\n", 344 "Received query for `%s' via cadet from client %p\n",
437 GNUNET_h2s (&sqm->query), 345 GNUNET_h2s (&sqm->query),
438 sc); 346 sc);
439 GNUNET_STATISTICS_update (GSF_stats, 347 GNUNET_STATISTICS_update (GSF_stats,
440 gettext_noop ("# queries received via cadet"), 1, 348 gettext_noop ("# queries received via cadet"),
349 1,
441 GNUNET_NO); 350 GNUNET_NO);
442 refresh_timeout_task (sc); 351 refresh_timeout_task (sc);
443 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, 352 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
@@ -446,14 +355,14 @@ request_cb (void *cls,
446 ntohl (sqm->type), 355 ntohl (sqm->type),
447 0 /* priority */, 356 0 /* priority */,
448 GSF_datastore_queue_size, 357 GSF_datastore_queue_size,
449 &handle_datastore_reply, sc); 358 &handle_datastore_reply,
359 sc);
450 if (NULL == sc->qe) 360 if (NULL == sc->qe)
451 { 361 {
452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 362 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453 "Queueing request with datastore failed (queue full?)\n"); 363 "Queueing request with datastore failed (queue full?)\n");
454 continue_writing (sc); 364 continue_writing (sc);
455 } 365 }
456 return GNUNET_OK;
457} 366}
458 367
459 368
@@ -464,16 +373,12 @@ request_cb (void *cls,
464 * @param channel the channel representing the cadet 373 * @param channel the channel representing the cadet
465 * @param initiator the identity of the peer who wants to establish a cadet 374 * @param initiator the identity of the peer who wants to establish a cadet
466 * with us; NULL on binding error 375 * with us; NULL on binding error
467 * @param port cadet port used for the incoming connection 376 * @return initial channel context (our `struct CadetClient`)
468 * @param options channel option flags
469 * @return initial channel context (our 'struct CadetClient')
470 */ 377 */
471static void * 378static void *
472accept_cb (void *cls, 379connect_cb (void *cls,
473 struct GNUNET_CADET_Channel *channel, 380 struct GNUNET_CADET_Channel *channel,
474 const struct GNUNET_PeerIdentity *initiator, 381 const struct GNUNET_PeerIdentity *initiator)
475 const struct GNUNET_HashCode *port,
476 enum GNUNET_CADET_ChannelOption options)
477{ 382{
478 struct CadetClient *sc; 383 struct CadetClient *sc;
479 384
@@ -481,13 +386,15 @@ accept_cb (void *cls,
481 if (sc_count >= sc_count_max) 386 if (sc_count >= sc_count_max)
482 { 387 {
483 GNUNET_STATISTICS_update (GSF_stats, 388 GNUNET_STATISTICS_update (GSF_stats,
484 gettext_noop ("# cadet client connections rejected"), 1, 389 gettext_noop ("# cadet client connections rejected"),
390 1,
485 GNUNET_NO); 391 GNUNET_NO);
486 GNUNET_CADET_channel_destroy (channel); 392 GNUNET_CADET_channel_destroy (channel);
487 return NULL; 393 return NULL;
488 } 394 }
489 GNUNET_STATISTICS_update (GSF_stats, 395 GNUNET_STATISTICS_update (GSF_stats,
490 gettext_noop ("# cadet connections active"), 1, 396 gettext_noop ("# cadet connections active"),
397 1,
491 GNUNET_NO); 398 GNUNET_NO);
492 sc = GNUNET_new (struct CadetClient); 399 sc = GNUNET_new (struct CadetClient);
493 sc->channel = channel; 400 sc->channel = channel;
@@ -506,18 +413,17 @@ accept_cb (void *cls,
506 413
507/** 414/**
508 * Function called by cadet when a client disconnects. 415 * Function called by cadet when a client disconnects.
509 * Cleans up our 'struct CadetClient' of that channel. 416 * Cleans up our `struct CadetClient` of that channel.
510 * 417 *
511 * @param cls NULL 418 * @param cls our `struct CadetClient`
512 * @param channel channel of the disconnecting client 419 * @param channel channel of the disconnecting client
513 * @param channel_ctx our 'struct CadetClient' 420 * @param channel_ctx
514 */ 421 */
515static void 422static void
516cleaner_cb (void *cls, 423disconnect_cb (void *cls,
517 const struct GNUNET_CADET_Channel *channel, 424 const struct GNUNET_CADET_Channel *channel)
518 void *channel_ctx)
519{ 425{
520 struct CadetClient *sc = channel_ctx; 426 struct CadetClient *sc = cls;
521 struct WriteQueueItem *wqi; 427 struct WriteQueueItem *wqi;
522 428
523 if (NULL == sc) 429 if (NULL == sc)
@@ -552,15 +458,42 @@ cleaner_cb (void *cls,
552} 458}
553 459
554 460
461
462/**
463 * Function called whenever an MQ-channel's transmission window size changes.
464 *
465 * The first callback in an outgoing channel will be with a non-zero value
466 * and will mean the channel is connected to the destination.
467 *
468 * For an incoming channel it will be called immediately after the
469 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
470 *
471 * @param cls Channel closure.
472 * @param channel Connection to the other end (henceforth invalid).
473 * @param window_size New window size. If the is more messages than buffer size
474 * this value will be negative..
475 */
476static void
477window_change_cb (void *cls,
478 const struct GNUNET_CADET_Channel *channel,
479 int window_size)
480{
481 /* FIXME: could do flow control here... */
482}
483
484
555/** 485/**
556 * Initialize subsystem for non-anonymous file-sharing. 486 * Initialize subsystem for non-anonymous file-sharing.
557 */ 487 */
558void 488void
559GSF_cadet_start_server () 489GSF_cadet_start_server ()
560{ 490{
561 static const struct GNUNET_CADET_MessageHandler handlers[] = { 491 struct GNUNET_MQ_MessageHandler handlers[] = {
562 { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct CadetQueryMessage)}, 492 GNUNET_MQ_hd_fixed_size (request,
563 { NULL, 0, 0 } 493 GNUNET_MESSAGE_TYPE_FS_CADET_QUERY,
494 struct CadetQueryMessage,
495 NULL),
496 GNUNET_MQ_handler_end ()
564 }; 497 };
565 struct GNUNET_HashCode port; 498 struct GNUNET_HashCode port;
566 499
@@ -573,18 +506,19 @@ GSF_cadet_start_server ()
573 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
574 "Initializing cadet FS server with a limit of %llu connections\n", 507 "Initializing cadet FS server with a limit of %llu connections\n",
575 sc_count_max); 508 sc_count_max);
576 listen_channel = GNUNET_CADET_connect (GSF_cfg, 509 cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
577 NULL, 510 cadet_handle = GNUNET_CADET_connecT (GSF_cfg);
578 &cleaner_cb, 511 GNUNET_assert (NULL != cadet_handle);
579 handlers);
580 GNUNET_assert (NULL != listen_channel);
581 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, 512 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
582 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), 513 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
583 &port); 514 &port);
584 GNUNET_CADET_open_port (listen_channel, 515 cadet_port = GNUNET_CADET_open_porT (cadet_handle,
585 &port, 516 &port,
586 &accept_cb, 517 &connect_cb,
587 NULL); 518 NULL,
519 &window_change_cb,
520 &disconnect_cb,
521 handlers);
588} 522}
589 523
590 524
@@ -594,10 +528,20 @@ GSF_cadet_start_server ()
594void 528void
595GSF_cadet_stop_server () 529GSF_cadet_stop_server ()
596{ 530{
597 if (NULL != listen_channel) 531 GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
532 &GSF_cadet_release_clients,
533 NULL);
534 GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
535 cadet_map = NULL;
536 if (NULL != cadet_port)
537 {
538 GNUNET_CADET_close_port (cadet_port);
539 cadet_port = NULL;
540 }
541 if (NULL != cadet_handle)
598 { 542 {
599 GNUNET_CADET_disconnect (listen_channel); 543 GNUNET_CADET_disconnect (cadet_handle);
600 listen_channel = NULL; 544 cadet_handle = NULL;
601 } 545 }
602 GNUNET_assert (NULL == sc_head); 546 GNUNET_assert (NULL == sc_head);
603 GNUNET_assert (0 == sc_count); 547 GNUNET_assert (0 == sc_count);