aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-03 11:06:03 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-03 11:06:03 +0000
commitb8eebba5dd3716f4d585e0b6b85d6ba36ef0241e (patch)
treead9d55f5bc9cbb3a3ea47105424226a83b18f5c7 /src/fs
parenta975c050f211d2ebe70eb78915fb30e0211ece64 (diff)
downloadgnunet-b8eebba5dd3716f4d585e0b6b85d6ba36ef0241e.tar.gz
gnunet-b8eebba5dd3716f4d585e0b6b85d6ba36ef0241e.zip
convert unindex to MQ
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/fs_api.h2
-rw-r--r--src/fs/fs_unindex.c133
2 files changed, 75 insertions, 60 deletions
diff --git a/src/fs/fs_api.h b/src/fs/fs_api.h
index 0e1476f5c..398de27fd 100644
--- a/src/fs/fs_api.h
+++ b/src/fs/fs_api.h
@@ -1435,7 +1435,7 @@ struct GNUNET_FS_UnindexContext
1435 * Connection to the FS service, only valid during the 1435 * Connection to the FS service, only valid during the
1436 * #UNINDEX_STATE_FS_NOTIFY phase. 1436 * #UNINDEX_STATE_FS_NOTIFY phase.
1437 */ 1437 */
1438 struct GNUNET_CLIENT_Connection *client; 1438 struct GNUNET_MQ_Handle *mq;
1439 1439
1440 /** 1440 /**
1441 * Connection to the datastore service, only valid during the 1441 * Connection to the datastore service, only valid during the
diff --git a/src/fs/fs_unindex.c b/src/fs/fs_unindex.c
index 2c4cb6ae6..e614fbe03 100644
--- a/src/fs/fs_unindex.c
+++ b/src/fs/fs_unindex.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2003--2013 GNUnet e.V. 3 Copyright (C) 2003--2013, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -221,53 +221,57 @@ unindex_process (void *cls,
221 221
222 222
223/** 223/**
224 * Function called with the response from the 224 * Function called with the response from the FS service to our
225 * FS service to our unindexing request. 225 * unindexing request.
226 * 226 *
227 * @param cls closure, unindex context 227 * @param cls closure, unindex context
228 * @param msg NULL on timeout, otherwise the response 228 * @param msg the response
229 */ 229 */
230static void 230static void
231process_fs_response (void *cls, const struct GNUNET_MessageHeader *msg) 231handle_unindex_response (void *cls,
232 const struct GNUNET_MessageHeader *msg)
232{ 233{
233 struct GNUNET_FS_UnindexContext *uc = cls; 234 struct GNUNET_FS_UnindexContext *uc = cls;
234 struct GNUNET_FS_ProgressInfo pi; 235 struct GNUNET_FS_ProgressInfo pi;
235 236
236 if (uc->client != NULL) 237 if (NULL != uc->mq)
237 { 238 {
238 GNUNET_CLIENT_disconnect (uc->client); 239 GNUNET_MQ_destroy (uc->mq);
239 uc->client = NULL; 240 uc->mq = NULL;
240 }
241 if (uc->state != UNINDEX_STATE_FS_NOTIFY)
242 {
243 uc->state = UNINDEX_STATE_ERROR;
244 uc->emsg =
245 GNUNET_strdup (_("Unexpected time for a response from `fs' service."));
246 GNUNET_FS_unindex_sync_ (uc);
247 signal_unindex_error (uc);
248 return;
249 }
250 if (NULL == msg)
251 {
252 uc->state = UNINDEX_STATE_ERROR;
253 uc->emsg = GNUNET_strdup (_("Timeout waiting for `fs' service."));
254 GNUNET_FS_unindex_sync_ (uc);
255 signal_unindex_error (uc);
256 return;
257 }
258 if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK)
259 {
260 uc->state = UNINDEX_STATE_ERROR;
261 uc->emsg = GNUNET_strdup (_("Invalid response from `fs' service."));
262 GNUNET_FS_unindex_sync_ (uc);
263 signal_unindex_error (uc);
264 return;
265 } 241 }
266 uc->state = UNINDEX_STATE_COMPLETE; 242 uc->state = UNINDEX_STATE_COMPLETE;
267 pi.status = GNUNET_FS_STATUS_UNINDEX_COMPLETED; 243 pi.status = GNUNET_FS_STATUS_UNINDEX_COMPLETED;
268 pi.value.unindex.eta = GNUNET_TIME_UNIT_ZERO; 244 pi.value.unindex.eta = GNUNET_TIME_UNIT_ZERO;
269 GNUNET_FS_unindex_sync_ (uc); 245 GNUNET_FS_unindex_sync_ (uc);
270 GNUNET_FS_unindex_make_status_ (&pi, uc, uc->file_size); 246 GNUNET_FS_unindex_make_status_ (&pi,
247 uc,
248 uc->file_size);
249}
250
251
252/**
253 * Generic error handler, called with the appropriate error code and
254 * the same closure specified at the creation of the message queue.
255 * Not every message queue implementation supports an error handler.
256 *
257 * @param cls closure with the `struct GNUNET_FS_UnindexContext *`
258 * @param error error code
259 */
260static void
261unindex_mq_error_handler (void *cls,
262 enum GNUNET_MQ_Error error)
263{
264 struct GNUNET_FS_UnindexContext *uc = cls;
265
266 if (NULL != uc->mq)
267 {
268 GNUNET_MQ_destroy (uc->mq);
269 uc->mq = NULL;
270 }
271 uc->state = UNINDEX_STATE_ERROR;
272 uc->emsg = GNUNET_strdup (_("Error communicating with `fs' service."));
273 GNUNET_FS_unindex_sync_ (uc);
274 signal_unindex_error (uc);
271} 275}
272 276
273 277
@@ -281,12 +285,25 @@ process_fs_response (void *cls, const struct GNUNET_MessageHeader *msg)
281static void 285static void
282unindex_finish (struct GNUNET_FS_UnindexContext *uc) 286unindex_finish (struct GNUNET_FS_UnindexContext *uc)
283{ 287{
288 GNUNET_MQ_hd_fixed_size (unindex_response,
289 GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK,
290 struct GNUNET_MessageHeader);
291 struct GNUNET_MQ_MessageHandler handlers[] = {
292 make_unindex_response_handler (uc),
293 GNUNET_MQ_handler_end ()
294 };
284 char *emsg; 295 char *emsg;
285 struct UnindexMessage req; 296 struct GNUNET_MQ_Envelope *env;
297 struct UnindexMessage *req;
286 298
287 /* generate final progress message */ 299 /* generate final progress message */
288 unindex_progress (uc, uc->file_size, NULL, 0, 0); 300 unindex_progress (uc,
289 GNUNET_FS_tree_encoder_finish (uc->tc, &emsg); 301 uc->file_size,
302 NULL,
303 0,
304 0);
305 GNUNET_FS_tree_encoder_finish (uc->tc,
306 &emsg);
290 uc->tc = NULL; 307 uc->tc = NULL;
291 GNUNET_DISK_file_close (uc->fh); 308 GNUNET_DISK_file_close (uc->fh);
292 uc->fh = NULL; 309 uc->fh = NULL;
@@ -296,8 +313,12 @@ unindex_finish (struct GNUNET_FS_UnindexContext *uc)
296 uc->seen_dh = NULL; 313 uc->seen_dh = NULL;
297 uc->state = UNINDEX_STATE_FS_NOTIFY; 314 uc->state = UNINDEX_STATE_FS_NOTIFY;
298 GNUNET_FS_unindex_sync_ (uc); 315 GNUNET_FS_unindex_sync_ (uc);
299 uc->client = GNUNET_CLIENT_connect ("fs", uc->h->cfg); 316 uc->mq = GNUNET_CLIENT_connecT (uc->h->cfg,
300 if (uc->client == NULL) 317 "fs",
318 handlers,
319 &unindex_mq_error_handler,
320 uc);
321 if (NULL == uc->mq)
301 { 322 {
302 uc->state = UNINDEX_STATE_ERROR; 323 uc->state = UNINDEX_STATE_ERROR;
303 uc->emsg = 324 uc->emsg =
@@ -308,21 +329,15 @@ unindex_finish (struct GNUNET_FS_UnindexContext *uc)
308 } 329 }
309 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 330 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
310 "Sending UNINDEX message to FS service\n"); 331 "Sending UNINDEX message to FS service\n");
311 req.header.size = htons (sizeof (struct UnindexMessage)); 332 env = GNUNET_MQ_msg (req,
312 req.header.type = htons (GNUNET_MESSAGE_TYPE_FS_UNINDEX); 333 GNUNET_MESSAGE_TYPE_FS_UNINDEX);
313 req.reserved = 0; 334 req->reserved = 0;
314 req.file_id = uc->file_id; 335 req->file_id = uc->file_id;
315 GNUNET_break (GNUNET_OK == 336 GNUNET_MQ_send (uc->mq,
316 GNUNET_CLIENT_transmit_and_get_response (uc->client, 337 env);
317 &req.header,
318 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
319 GNUNET_YES,
320 &process_fs_response,
321 uc));
322} 338}
323 339
324 340
325
326/** 341/**
327 * Function called by the directory scanner as we extract keywords 342 * Function called by the directory scanner as we extract keywords
328 * that we will need to remove UBlocks. 343 * that we will need to remove UBlocks.
@@ -745,10 +760,10 @@ GNUNET_FS_unindex_signal_suspend_ (void *cls)
745 GNUNET_FS_uri_destroy (uc->ksk_uri); 760 GNUNET_FS_uri_destroy (uc->ksk_uri);
746 uc->ksk_uri = NULL; 761 uc->ksk_uri = NULL;
747 } 762 }
748 if (uc->client != NULL) 763 if (NULL != uc->mq)
749 { 764 {
750 GNUNET_CLIENT_disconnect (uc->client); 765 GNUNET_MQ_destroy (uc->mq);
751 uc->client = NULL; 766 uc->mq = NULL;
752 } 767 }
753 if (NULL != uc->dsh) 768 if (NULL != uc->dsh)
754 { 769 {
@@ -835,7 +850,7 @@ GNUNET_FS_unindex_stop (struct GNUNET_FS_UnindexContext *uc)
835{ 850{
836 struct GNUNET_FS_ProgressInfo pi; 851 struct GNUNET_FS_ProgressInfo pi;
837 852
838 if (uc->dscan != NULL) 853 if (NULL != uc->dscan)
839 { 854 {
840 GNUNET_FS_directory_scan_abort (uc->dscan); 855 GNUNET_FS_directory_scan_abort (uc->dscan);
841 uc->dscan = NULL; 856 uc->dscan = NULL;
@@ -845,15 +860,15 @@ GNUNET_FS_unindex_stop (struct GNUNET_FS_UnindexContext *uc)
845 GNUNET_DATASTORE_cancel (uc->dqe); 860 GNUNET_DATASTORE_cancel (uc->dqe);
846 uc->dqe = NULL; 861 uc->dqe = NULL;
847 } 862 }
848 if (uc->fhc != NULL) 863 if (NULL != uc->fhc)
849 { 864 {
850 GNUNET_CRYPTO_hash_file_cancel (uc->fhc); 865 GNUNET_CRYPTO_hash_file_cancel (uc->fhc);
851 uc->fhc = NULL; 866 uc->fhc = NULL;
852 } 867 }
853 if (uc->client != NULL) 868 if (NULL != uc->mq)
854 { 869 {
855 GNUNET_CLIENT_disconnect (uc->client); 870 GNUNET_MQ_destroy (uc->mq);
856 uc->client = NULL; 871 uc->mq = NULL;
857 } 872 }
858 if (NULL != uc->dsh) 873 if (NULL != uc->dsh)
859 { 874 {