diff options
author | Christian Grothoff <christian@grothoff.org> | 2017-02-17 11:06:15 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2017-02-17 11:06:15 +0100 |
commit | 9727e5e53721dace7abbcc5bcd28c838af4291cc (patch) | |
tree | ca32ed19cf0d4129d3497261531aa40a19599280 /src/fs/gnunet-service-fs_cadet_server.c | |
parent | c793bffc39fe1445616c9d0cb071d62575dea217 (diff) | |
download | gnunet-9727e5e53721dace7abbcc5bcd28c838af4291cc.tar.gz gnunet-9727e5e53721dace7abbcc5bcd28c838af4291cc.zip |
convert to new CADET API, not working due to CADET-API internal bugs
Diffstat (limited to 'src/fs/gnunet-service-fs_cadet_server.c')
-rw-r--r-- | src/fs/gnunet-service-fs_cadet_server.c | 286 |
1 files changed, 115 insertions, 171 deletions
diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c index ac86537c3..0a72a8279 100644 --- a/src/fs/gnunet-service-fs_cadet_server.c +++ b/src/fs/gnunet-service-fs_cadet_server.c | |||
@@ -124,9 +124,9 @@ struct CadetClient | |||
124 | 124 | ||
125 | 125 | ||
126 | /** | 126 | /** |
127 | * Listen channel for incoming requests. | 127 | * Listen port for incoming requests. |
128 | */ | 128 | */ |
129 | static struct GNUNET_CADET_Handle *listen_channel; | 129 | static struct GNUNET_CADET_Port *cadet_port; |
130 | 130 | ||
131 | /** | 131 | /** |
132 | * Head of DLL of cadet clients. | 132 | * Head of DLL of cadet clients. |
@@ -188,121 +188,29 @@ refresh_timeout_task (struct CadetClient *sc) | |||
188 | 188 | ||
189 | 189 | ||
190 | /** | 190 | /** |
191 | * We're done handling a request from a client, read the next one. | 191 | * Check if we are done with the write queue, and if so tell CADET |
192 | * that we are ready to read more. | ||
192 | * | 193 | * |
193 | * @param sc client to continue reading requests from | 194 | * @param cls where to process the write queue |
194 | */ | 195 | */ |
195 | static void | 196 | static void |
196 | continue_reading (struct CadetClient *sc) | 197 | continue_writing (void *cls) |
197 | { | ||
198 | refresh_timeout_task (sc); | ||
199 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
200 | "Finished processing cadet request from client %p, ready to receive the next one\n", | ||
201 | sc); | ||
202 | GNUNET_CADET_receive_done (sc->channel); | ||
203 | } | ||
204 | |||
205 | |||
206 | /** | ||
207 | * Transmit the next entry from the write queue. | ||
208 | * | ||
209 | * @param sc where to process the write queue | ||
210 | */ | ||
211 | static void | ||
212 | continue_writing (struct CadetClient *sc); | ||
213 | |||
214 | |||
215 | /** | ||
216 | * Send a reply now, cadet is ready. | ||
217 | * | ||
218 | * @param cls closure with the `struct CadetClient` which sent the query | ||
219 | * @param size number of bytes available in @a buf | ||
220 | * @param buf where to write the message | ||
221 | * @return number of bytes written to @a buf | ||
222 | */ | ||
223 | static size_t | ||
224 | write_continuation (void *cls, | ||
225 | size_t size, | ||
226 | void *buf) | ||
227 | { | 198 | { |
228 | struct CadetClient *sc = cls; | 199 | struct CadetClient *sc = cls; |
229 | struct GNUNET_CADET_Channel *tun; | 200 | struct GNUNET_MQ_Handle *mq; |
230 | struct WriteQueueItem *wqi; | ||
231 | size_t ret; | ||
232 | |||
233 | sc->wh = NULL; | ||
234 | if (NULL == (wqi = sc->wqi_head)) | ||
235 | { | ||
236 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
237 | "Write queue empty, reading more requests\n"); | ||
238 | return 0; | ||
239 | } | ||
240 | if ( (0 == size) || | ||
241 | (size < wqi->msize) ) | ||
242 | { | ||
243 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
244 | "Transmission of reply failed, terminating cadet\n"); | ||
245 | tun = sc->channel; | ||
246 | sc->channel = NULL; | ||
247 | GNUNET_CADET_channel_destroy (tun); | ||
248 | return 0; | ||
249 | } | ||
250 | GNUNET_CONTAINER_DLL_remove (sc->wqi_head, | ||
251 | sc->wqi_tail, | ||
252 | wqi); | ||
253 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
254 | "Transmitted %u byte reply via cadet to %p\n", | ||
255 | (unsigned int) size, | ||
256 | sc); | ||
257 | GNUNET_STATISTICS_update (GSF_stats, | ||
258 | gettext_noop ("# Blocks transferred via cadet"), 1, | ||
259 | GNUNET_NO); | ||
260 | ret = wqi->msize; | ||
261 | GNUNET_memcpy (buf, &wqi[1], ret); | ||
262 | GNUNET_free (wqi); | ||
263 | continue_writing (sc); | ||
264 | return ret; | ||
265 | } | ||
266 | |||
267 | |||
268 | /** | ||
269 | * Transmit the next entry from the write queue. | ||
270 | * | ||
271 | * @param sc where to process the write queue | ||
272 | */ | ||
273 | static void | ||
274 | continue_writing (struct CadetClient *sc) | ||
275 | { | ||
276 | struct WriteQueueItem *wqi; | ||
277 | struct GNUNET_CADET_Channel *tun; | ||
278 | 201 | ||
279 | if (NULL != sc->wh) | 202 | mq = GNUNET_CADET_get_mq (sc->channel); |
203 | if (0 != GNUNET_MQ_get_length (mq)) | ||
280 | { | 204 | { |
281 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 205 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
282 | "Write pending, waiting for it to complete\n"); | 206 | "Write pending, waiting for it to complete\n"); |
283 | return; /* write already pending */ | ||
284 | } | ||
285 | if (NULL == (wqi = sc->wqi_head)) | ||
286 | { | ||
287 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
288 | "Write queue empty, reading more requests\n"); | ||
289 | continue_reading (sc); | ||
290 | return; | ||
291 | } | ||
292 | sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO, | ||
293 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
294 | wqi->msize, | ||
295 | &write_continuation, | ||
296 | sc); | ||
297 | if (NULL == sc->wh) | ||
298 | { | ||
299 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
300 | "Write failed; terminating cadet\n"); | ||
301 | tun = sc->channel; | ||
302 | sc->channel = NULL; | ||
303 | GNUNET_CADET_channel_destroy (tun); | ||
304 | return; | 207 | return; |
305 | } | 208 | } |
209 | refresh_timeout_task (sc); | ||
210 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
211 | "Finished processing cadet request from client %p, ready to receive the next one\n", | ||
212 | sc); | ||
213 | GNUNET_CADET_receive_done (sc->channel); | ||
306 | } | 214 | } |
307 | 215 | ||
308 | 216 | ||
@@ -333,7 +241,7 @@ handle_datastore_reply (void *cls, | |||
333 | { | 241 | { |
334 | struct CadetClient *sc = cls; | 242 | struct CadetClient *sc = cls; |
335 | size_t msize = size + sizeof (struct CadetReplyMessage); | 243 | size_t msize = size + sizeof (struct CadetReplyMessage); |
336 | struct WriteQueueItem *wqi; | 244 | struct GNUNET_MQ_Envelope *env; |
337 | struct CadetReplyMessage *srm; | 245 | struct CadetReplyMessage *srm; |
338 | 246 | ||
339 | sc->qe = NULL; | 247 | sc->qe = NULL; |
@@ -357,7 +265,8 @@ handle_datastore_reply (void *cls, | |||
357 | GNUNET_h2s (key)); | 265 | GNUNET_h2s (key)); |
358 | } | 266 | } |
359 | GNUNET_STATISTICS_update (GSF_stats, | 267 | GNUNET_STATISTICS_update (GSF_stats, |
360 | gettext_noop ("# queries received via CADET not answered"), 1, | 268 | gettext_noop ("# queries received via CADET not answered"), |
269 | 1, | ||
361 | GNUNET_NO); | 270 | GNUNET_NO); |
362 | continue_writing (sc); | 271 | continue_writing (sc); |
363 | return; | 272 | return; |
@@ -369,9 +278,13 @@ handle_datastore_reply (void *cls, | |||
369 | GNUNET_h2s (key)); | 278 | GNUNET_h2s (key)); |
370 | if (GNUNET_OK != | 279 | if (GNUNET_OK != |
371 | GNUNET_FS_handle_on_demand_block (key, | 280 | GNUNET_FS_handle_on_demand_block (key, |
372 | size, data, type, | 281 | size, |
373 | priority, anonymity, | 282 | data, |
374 | expiration, uid, | 283 | type, |
284 | priority, | ||
285 | anonymity, | ||
286 | expiration, | ||
287 | uid, | ||
375 | &handle_datastore_reply, | 288 | &handle_datastore_reply, |
376 | sc)) | 289 | sc)) |
377 | { | 290 | { |
@@ -394,19 +307,23 @@ handle_datastore_reply (void *cls, | |||
394 | (unsigned int) type, | 307 | (unsigned int) type, |
395 | GNUNET_h2s (key), | 308 | GNUNET_h2s (key), |
396 | sc); | 309 | sc); |
397 | wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize); | 310 | env = GNUNET_MQ_msg_extra (srm, |
398 | wqi->msize = msize; | 311 | size, |
399 | srm = (struct CadetReplyMessage *) &wqi[1]; | 312 | GNUNET_MESSAGE_TYPE_FS_CADET_REPLY); |
400 | srm->header.size = htons ((uint16_t) msize); | ||
401 | srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY); | ||
402 | srm->type = htonl (type); | 313 | srm->type = htonl (type); |
403 | srm->expiration = GNUNET_TIME_absolute_hton (expiration); | 314 | srm->expiration = GNUNET_TIME_absolute_hton (expiration); |
404 | GNUNET_memcpy (&srm[1], data, size); | 315 | GNUNET_memcpy (&srm[1], |
405 | sc->reply_size = msize; | 316 | data, |
406 | GNUNET_CONTAINER_DLL_insert (sc->wqi_head, | 317 | size); |
407 | sc->wqi_tail, | 318 | GNUNET_MQ_notify_sent (env, |
408 | wqi); | 319 | &continue_writing, |
409 | continue_writing (sc); | 320 | sc); |
321 | GNUNET_STATISTICS_update (GSF_stats, | ||
322 | gettext_noop ("# Blocks transferred via cadet"), | ||
323 | 1, | ||
324 | GNUNET_NO); | ||
325 | GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel), | ||
326 | env); | ||
410 | } | 327 | } |
411 | 328 | ||
412 | 329 | ||
@@ -414,30 +331,22 @@ handle_datastore_reply (void *cls, | |||
414 | * Functions with this signature are called whenever a | 331 | * Functions with this signature are called whenever a |
415 | * complete query message is received. | 332 | * complete query message is received. |
416 | * | 333 | * |
417 | * Do not call #GNUNET_SERVER_mst_destroy() in callback | ||
418 | * | ||
419 | * @param cls closure with the `struct CadetClient` | 334 | * @param cls closure with the `struct CadetClient` |
420 | * @param channel channel handle | 335 | * @param sqm the actual message |
421 | * @param channel_ctx channel context | ||
422 | * @param message the actual message | ||
423 | * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing | ||
424 | */ | 336 | */ |
425 | static int | 337 | static void |
426 | request_cb (void *cls, | 338 | handle_request (void *cls, |
427 | struct GNUNET_CADET_Channel *channel, | 339 | const struct CadetQueryMessage *sqm) |
428 | void **channel_ctx, | ||
429 | const struct GNUNET_MessageHeader *message) | ||
430 | { | 340 | { |
431 | struct CadetClient *sc = *channel_ctx; | 341 | struct CadetClient *sc = cls; |
432 | const struct CadetQueryMessage *sqm; | ||
433 | 342 | ||
434 | sqm = (const struct CadetQueryMessage *) message; | ||
435 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 343 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
436 | "Received query for `%s' via cadet from client %p\n", | 344 | "Received query for `%s' via cadet from client %p\n", |
437 | GNUNET_h2s (&sqm->query), | 345 | GNUNET_h2s (&sqm->query), |
438 | sc); | 346 | sc); |
439 | GNUNET_STATISTICS_update (GSF_stats, | 347 | GNUNET_STATISTICS_update (GSF_stats, |
440 | gettext_noop ("# queries received via cadet"), 1, | 348 | gettext_noop ("# queries received via cadet"), |
349 | 1, | ||
441 | GNUNET_NO); | 350 | GNUNET_NO); |
442 | refresh_timeout_task (sc); | 351 | refresh_timeout_task (sc); |
443 | sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, | 352 | sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, |
@@ -446,14 +355,14 @@ request_cb (void *cls, | |||
446 | ntohl (sqm->type), | 355 | ntohl (sqm->type), |
447 | 0 /* priority */, | 356 | 0 /* priority */, |
448 | GSF_datastore_queue_size, | 357 | GSF_datastore_queue_size, |
449 | &handle_datastore_reply, sc); | 358 | &handle_datastore_reply, |
359 | sc); | ||
450 | if (NULL == sc->qe) | 360 | if (NULL == sc->qe) |
451 | { | 361 | { |
452 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 362 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
453 | "Queueing request with datastore failed (queue full?)\n"); | 363 | "Queueing request with datastore failed (queue full?)\n"); |
454 | continue_writing (sc); | 364 | continue_writing (sc); |
455 | } | 365 | } |
456 | return GNUNET_OK; | ||
457 | } | 366 | } |
458 | 367 | ||
459 | 368 | ||
@@ -464,16 +373,12 @@ request_cb (void *cls, | |||
464 | * @param channel the channel representing the cadet | 373 | * @param channel the channel representing the cadet |
465 | * @param initiator the identity of the peer who wants to establish a cadet | 374 | * @param initiator the identity of the peer who wants to establish a cadet |
466 | * with us; NULL on binding error | 375 | * with us; NULL on binding error |
467 | * @param port cadet port used for the incoming connection | 376 | * @return initial channel context (our `struct CadetClient`) |
468 | * @param options channel option flags | ||
469 | * @return initial channel context (our 'struct CadetClient') | ||
470 | */ | 377 | */ |
471 | static void * | 378 | static void * |
472 | accept_cb (void *cls, | 379 | connect_cb (void *cls, |
473 | struct GNUNET_CADET_Channel *channel, | 380 | struct GNUNET_CADET_Channel *channel, |
474 | const struct GNUNET_PeerIdentity *initiator, | 381 | const struct GNUNET_PeerIdentity *initiator) |
475 | const struct GNUNET_HashCode *port, | ||
476 | enum GNUNET_CADET_ChannelOption options) | ||
477 | { | 382 | { |
478 | struct CadetClient *sc; | 383 | struct CadetClient *sc; |
479 | 384 | ||
@@ -481,13 +386,15 @@ accept_cb (void *cls, | |||
481 | if (sc_count >= sc_count_max) | 386 | if (sc_count >= sc_count_max) |
482 | { | 387 | { |
483 | GNUNET_STATISTICS_update (GSF_stats, | 388 | GNUNET_STATISTICS_update (GSF_stats, |
484 | gettext_noop ("# cadet client connections rejected"), 1, | 389 | gettext_noop ("# cadet client connections rejected"), |
390 | 1, | ||
485 | GNUNET_NO); | 391 | GNUNET_NO); |
486 | GNUNET_CADET_channel_destroy (channel); | 392 | GNUNET_CADET_channel_destroy (channel); |
487 | return NULL; | 393 | return NULL; |
488 | } | 394 | } |
489 | GNUNET_STATISTICS_update (GSF_stats, | 395 | GNUNET_STATISTICS_update (GSF_stats, |
490 | gettext_noop ("# cadet connections active"), 1, | 396 | gettext_noop ("# cadet connections active"), |
397 | 1, | ||
491 | GNUNET_NO); | 398 | GNUNET_NO); |
492 | sc = GNUNET_new (struct CadetClient); | 399 | sc = GNUNET_new (struct CadetClient); |
493 | sc->channel = channel; | 400 | sc->channel = channel; |
@@ -506,18 +413,17 @@ accept_cb (void *cls, | |||
506 | 413 | ||
507 | /** | 414 | /** |
508 | * Function called by cadet when a client disconnects. | 415 | * Function called by cadet when a client disconnects. |
509 | * Cleans up our 'struct CadetClient' of that channel. | 416 | * Cleans up our `struct CadetClient` of that channel. |
510 | * | 417 | * |
511 | * @param cls NULL | 418 | * @param cls our `struct CadetClient` |
512 | * @param channel channel of the disconnecting client | 419 | * @param channel channel of the disconnecting client |
513 | * @param channel_ctx our 'struct CadetClient' | 420 | * @param channel_ctx |
514 | */ | 421 | */ |
515 | static void | 422 | static void |
516 | cleaner_cb (void *cls, | 423 | disconnect_cb (void *cls, |
517 | const struct GNUNET_CADET_Channel *channel, | 424 | const struct GNUNET_CADET_Channel *channel) |
518 | void *channel_ctx) | ||
519 | { | 425 | { |
520 | struct CadetClient *sc = channel_ctx; | 426 | struct CadetClient *sc = cls; |
521 | struct WriteQueueItem *wqi; | 427 | struct WriteQueueItem *wqi; |
522 | 428 | ||
523 | if (NULL == sc) | 429 | if (NULL == sc) |
@@ -552,15 +458,42 @@ cleaner_cb (void *cls, | |||
552 | } | 458 | } |
553 | 459 | ||
554 | 460 | ||
461 | |||
462 | /** | ||
463 | * Function called whenever an MQ-channel's transmission window size changes. | ||
464 | * | ||
465 | * The first callback in an outgoing channel will be with a non-zero value | ||
466 | * and will mean the channel is connected to the destination. | ||
467 | * | ||
468 | * For an incoming channel it will be called immediately after the | ||
469 | * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value. | ||
470 | * | ||
471 | * @param cls Channel closure. | ||
472 | * @param channel Connection to the other end (henceforth invalid). | ||
473 | * @param window_size New window size. If the is more messages than buffer size | ||
474 | * this value will be negative.. | ||
475 | */ | ||
476 | static void | ||
477 | window_change_cb (void *cls, | ||
478 | const struct GNUNET_CADET_Channel *channel, | ||
479 | int window_size) | ||
480 | { | ||
481 | /* FIXME: could do flow control here... */ | ||
482 | } | ||
483 | |||
484 | |||
555 | /** | 485 | /** |
556 | * Initialize subsystem for non-anonymous file-sharing. | 486 | * Initialize subsystem for non-anonymous file-sharing. |
557 | */ | 487 | */ |
558 | void | 488 | void |
559 | GSF_cadet_start_server () | 489 | GSF_cadet_start_server () |
560 | { | 490 | { |
561 | static const struct GNUNET_CADET_MessageHandler handlers[] = { | 491 | struct GNUNET_MQ_MessageHandler handlers[] = { |
562 | { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct CadetQueryMessage)}, | 492 | GNUNET_MQ_hd_fixed_size (request, |
563 | { NULL, 0, 0 } | 493 | GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, |
494 | struct CadetQueryMessage, | ||
495 | NULL), | ||
496 | GNUNET_MQ_handler_end () | ||
564 | }; | 497 | }; |
565 | struct GNUNET_HashCode port; | 498 | struct GNUNET_HashCode port; |
566 | 499 | ||
@@ -573,18 +506,19 @@ GSF_cadet_start_server () | |||
573 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 506 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
574 | "Initializing cadet FS server with a limit of %llu connections\n", | 507 | "Initializing cadet FS server with a limit of %llu connections\n", |
575 | sc_count_max); | 508 | sc_count_max); |
576 | listen_channel = GNUNET_CADET_connect (GSF_cfg, | 509 | cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES); |
577 | NULL, | 510 | cadet_handle = GNUNET_CADET_connecT (GSF_cfg); |
578 | &cleaner_cb, | 511 | GNUNET_assert (NULL != cadet_handle); |
579 | handlers); | ||
580 | GNUNET_assert (NULL != listen_channel); | ||
581 | GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, | 512 | GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, |
582 | strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), | 513 | strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), |
583 | &port); | 514 | &port); |
584 | GNUNET_CADET_open_port (listen_channel, | 515 | cadet_port = GNUNET_CADET_open_porT (cadet_handle, |
585 | &port, | 516 | &port, |
586 | &accept_cb, | 517 | &connect_cb, |
587 | NULL); | 518 | NULL, |
519 | &window_change_cb, | ||
520 | &disconnect_cb, | ||
521 | handlers); | ||
588 | } | 522 | } |
589 | 523 | ||
590 | 524 | ||
@@ -594,10 +528,20 @@ GSF_cadet_start_server () | |||
594 | void | 528 | void |
595 | GSF_cadet_stop_server () | 529 | GSF_cadet_stop_server () |
596 | { | 530 | { |
597 | if (NULL != listen_channel) | 531 | GNUNET_CONTAINER_multipeermap_iterate (cadet_map, |
532 | &GSF_cadet_release_clients, | ||
533 | NULL); | ||
534 | GNUNET_CONTAINER_multipeermap_destroy (cadet_map); | ||
535 | cadet_map = NULL; | ||
536 | if (NULL != cadet_port) | ||
537 | { | ||
538 | GNUNET_CADET_close_port (cadet_port); | ||
539 | cadet_port = NULL; | ||
540 | } | ||
541 | if (NULL != cadet_handle) | ||
598 | { | 542 | { |
599 | GNUNET_CADET_disconnect (listen_channel); | 543 | GNUNET_CADET_disconnect (cadet_handle); |
600 | listen_channel = NULL; | 544 | cadet_handle = NULL; |
601 | } | 545 | } |
602 | GNUNET_assert (NULL == sc_head); | 546 | GNUNET_assert (NULL == sc_head); |
603 | GNUNET_assert (0 == sc_count); | 547 | GNUNET_assert (0 == sc_count); |