From aab2f42e7fc3fe7d0ad7b9a4a5e8b4bc42c78505 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 12 May 2010 11:05:45 +0000 Subject: ds --- contrib/defaults.conf | 1 + src/Makefile.am | 6 +- src/datastore/datastore_api.c | 126 +++++++++++++++++++++++++++------ src/fs/fs_namespace.c | 1 + src/fs/fs_publish.c | 4 ++ src/fs/fs_unindex.c | 5 +- src/fs/gnunet-service-fs.c | 39 +++++++++- src/fs/gnunet-service-fs_drq.c | 9 ++- src/include/gnunet_block_lib.h | 6 +- src/include/gnunet_datastore_service.h | 84 ++++++++++++++++------ 10 files changed, 220 insertions(+), 61 deletions(-) diff --git a/contrib/defaults.conf b/contrib/defaults.conf index 8aac5049e..b4a3ddd72 100644 --- a/contrib/defaults.conf +++ b/contrib/defaults.conf @@ -198,5 +198,6 @@ CONFIG = $DEFAULTCONFIG BINARY = gnunet-service-fs ACCEPT_FROM = 127.0.0.1; ACCEPT_FROM6 = ::1; +ACTIVEMIGRATION = YES # DEBUG = YES diff --git a/src/Makefile.am b/src/Makefile.am index 1756ac1ec..f709a63d6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -15,7 +15,6 @@ SUBDIRS = \ hello \ peerinfo \ datacache \ - datastore \ template \ transport \ peerinfo-tool \ @@ -24,6 +23,5 @@ SUBDIRS = \ testing \ hostlist \ topology \ - $(NAT_DIR) \ - fs \ - migration + $(NAT_DIR) + diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index c8b4f2e91..72f7faed7 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors) + (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -28,6 +28,75 @@ #include "gnunet_datastore_service.h" #include "datastore.h" +/** + * Entry in our priority queue. + */ +struct QueueEntry +{ + + /** + * This is a linked list. + */ + struct QueueEntry *next; + + /** + * This is a linked list. + */ + struct QueueEntry *prev; + + /** + * Handle to the master context. + */ + struct GNUNET_DATASTORE_Handle *h; + + /** + * Task for timeout signalling. + */ + GNUNET_SCHEDULER_TaskIdentifier task; + + /** + * Timeout for the current operation. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Priority in the queue. + */ + unsigned int priority; + + /** + * Maximum allowed length of queue (otherwise + * this request should be discarded). + */ + unsigned int max_queue; + + /** + * Number of bytes in the request message following + * this struct. + */ + uint16_t message_size; + + /** + * Has this message been transmitted to the service? + * Only ever GNUNET_YES for the head of the queue. + */ + int16_t was_transmitted; + + /** + * Response processor (NULL if we are not waiting for a response). + * This struct should be used for the closure, function-specific + * arguments can be passed via 'client_ctx'. + */ + GNUNET_CLIENT_MessageHandler response_proc; + + /** + * Specific context (variable argument that + * can be used by the response processor). + */ + void *client_ctx; + +}; + /** * Handle to the datastore service. Followed * by 65536 bytes used for storing messages. @@ -51,27 +120,19 @@ struct GNUNET_DATASTORE_Handle struct GNUNET_CLIENT_Connection *client; /** - * Current response processor (NULL if we are not waiting for a - * response). The specific type depends on the kind of message we - * just transmitted. + * Current head of priority queue. */ - void *response_proc; - - /** - * Closure for response_proc. - */ - void *response_proc_cls; + struct QueueEntry *queue_head; /** - * Timeout for the current operation. + * Current tail of priority queue. */ - struct GNUNET_TIME_Absolute timeout; + struct QueueEntry *queue_tail; /** - * Number of bytes in the message following - * this struct, 0 if we have no request pending. + * Number of entries in the queue. */ - size_t message_size; + unsigned int queue_size; }; @@ -84,12 +145,13 @@ struct GNUNET_DATASTORE_Handle * @param sched scheduler to use * @return handle to use to access the service */ -struct GNUNET_DATASTORE_Handle *GNUNET_DATASTORE_connect (const struct - GNUNET_CONFIGURATION_Handle - *cfg, - struct - GNUNET_SCHEDULER_Handle - *sched) +struct GNUNET_DATASTORE_Handle * +GNUNET_DATASTORE_connect (const struct + GNUNET_CONFIGURATION_Handle + *cfg, + struct + GNUNET_SCHEDULER_Handle + *sched) { struct GNUNET_CLIENT_Connection *c; struct GNUNET_DATASTORE_Handle *h; @@ -108,10 +170,16 @@ struct GNUNET_DATASTORE_Handle *GNUNET_DATASTORE_connect (const struct /** * Transmit DROP message to datastore service. + * + * @param cls the 'struct GNUNET_DATASTORE_Handle' + * @param size number of bytes that can be copied to buf + * @param buf where to copy the drop message + * @return number of bytes written to buf */ static size_t transmit_drop (void *cls, - size_t size, void *buf) + size_t size, + void *buf) { struct GNUNET_DATASTORE_Handle *h = cls; struct GNUNET_MessageHeader *hdr; @@ -142,9 +210,20 @@ transmit_drop (void *cls, void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop) { + struct QueueEntry *qe; + if (h->client != NULL) GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); h->client = NULL; + while (NULL != (qe = h->queue_head)) + { + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); + if (NULL != qe->response_proc) + qe->response_proc (qe, NULL); + GNUNET_free (qe); + } if (GNUNET_YES == drop) { h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); @@ -166,6 +245,7 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, } +#if 0 /** * Type of a function to call when we receive a message * from the service. This specific function is used @@ -764,6 +844,6 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, memcpy (&dm[1], data, size); transmit_for_status (h, cont, cont_cls, timeout); } - +#endif /* end of datastore_api.c */ diff --git a/src/fs/fs_namespace.c b/src/fs/fs_namespace.c index e853ce813..9e198adc2 100644 --- a/src/fs/fs_namespace.c +++ b/src/fs/fs_namespace.c @@ -203,6 +203,7 @@ advertisement_cont (void *cls, ac->priority, ac->anonymity, ac->expiration, + -2, 1, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &advertisement_cont, ac); diff --git a/src/fs/fs_publish.c b/src/fs/fs_publish.c index be2994176..aa7f794be 100644 --- a/src/fs/fs_publish.c +++ b/src/fs/fs_publish.c @@ -483,6 +483,7 @@ block_proc (void *cls, p->priority, p->anonymity, p->expirationTime, + -2, 1, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &ds_put_cont, dpc_cls); @@ -504,6 +505,7 @@ block_proc (void *cls, p->priority, p->anonymity, p->expirationTime, + -2, 1, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &ds_put_cont, dpc_cls); @@ -1354,6 +1356,7 @@ publish_ksk_cont (void *cls, pkc->priority, pkc->anonymity, pkc->expirationTime, + -2, 1, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &kb_put_cont, pkc); @@ -1667,6 +1670,7 @@ GNUNET_FS_publish_sks (struct GNUNET_FS_Handle *h, priority, anonymity, expirationTime, + -2, 1, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &sb_put_cont, psc); diff --git a/src/fs/fs_unindex.c b/src/fs/fs_unindex.c index 349cc4251..220ec5870 100644 --- a/src/fs/fs_unindex.c +++ b/src/fs/fs_unindex.c @@ -218,9 +218,10 @@ unindex_process (void *cls, query, size, data, + -2, 1, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, &process_cont, - uc, - GNUNET_CONSTANTS_SERVICE_TIMEOUT); + uc); } diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 42dd2d23b..7e88f9d4e 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -606,6 +606,10 @@ static struct ClientList *client_list; */ static struct GNUNET_CORE_Handle *core; +/** + * Are we allowed to migrate content to this peer. + */ +static int active_migration; /* ******************* clean up functions ************************ */ @@ -1998,6 +2002,24 @@ process_reply (void *cls, } + +/** + * Continuation called to notify client about result of the + * operation. + * + * @param cls closure + * @param success GNUNET_SYSERR on failure + * @param msg NULL on success, otherwise an error message + */ +static void +put_migration_continuation (void *cls, + int success, + const char *msg) +{ + /* FIXME */ +} + + /** * Handle P2P "PUT" message. * @@ -2076,9 +2098,17 @@ handle_p2p_put (void *cls, &query, &process_reply, &prq); - // FIXME: if migration is on and load is low, - // queue to store data in datastore; - // use "prq.priority" for that! + if (GNUNET_YES == active_migration) + { + GNUNET_DATASTORE_put (NULL /* FIXME */, + 0, &query, dsize, &put[1], + type, prq.priority, 1 /* anonymity */, + expiration, + 0, 64 /* FIXME: use define */, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + &put_migration_continuation, + NULL); + } return GNUNET_OK; } @@ -2936,6 +2966,9 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *cfg) { + active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg, + "FS", + "ACTIVEMIGRATION"); if ( (GNUNET_OK != GNUNET_FS_drq_init (sched, cfg)) || (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg)) || (GNUNET_OK != main_init (sched, server, cfg)) ) diff --git a/src/fs/gnunet-service-fs_drq.c b/src/fs/gnunet-service-fs_drq.c index fde34187a..ab6c9ad21 100644 --- a/src/fs/gnunet-service-fs_drq.c +++ b/src/fs/gnunet-service-fs_drq.c @@ -222,9 +222,10 @@ run_next_request (void *cls, GNUNET_DATASTORE_get (dsh, &gc->key, gc->type, + 42 /* FIXME */, 64 /* FIXME */, + GNUNET_TIME_absolute_get_remaining(gc->timeout), &get_iterator, - gc, - GNUNET_TIME_absolute_get_remaining(gc->timeout)); + gc); } @@ -508,8 +509,10 @@ GNUNET_FS_drq_remove (const GNUNET_HashCode *key, rc->cont_cls = cont_cls; rc->rmdsh = rmdsh; GNUNET_DATASTORE_remove (rmdsh, key, size, data, + -3, 128, + timeout, &drq_remove_cont, - rc, timeout); + rc); } diff --git a/src/include/gnunet_block_lib.h b/src/include/gnunet_block_lib.h index 513605eb2..d76a7f63f 100644 --- a/src/include/gnunet_block_lib.h +++ b/src/include/gnunet_block_lib.h @@ -80,9 +80,6 @@ enum GNUNET_BLOCK_Type }; - - - /** * @brief keyword block (advertising data under a keyword) */ @@ -110,6 +107,7 @@ struct KBlock }; + /** * @brief namespace content block (advertising data under an identifier in a namespace) */ @@ -193,8 +191,6 @@ struct NBlock }; - - /** * @brief index block (indexing a DBlock that * can be obtained directly from reading diff --git a/src/include/gnunet_datastore_service.h b/src/include/gnunet_datastore_service.h index bf18860a6..6f16dad22 100644 --- a/src/include/gnunet_datastore_service.h +++ b/src/include/gnunet_datastore_service.h @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors) + (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -80,7 +80,9 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, * operation. * * @param cls closure - * @param success GNUNET_SYSERR on failure + * @param success GNUNET_SYSERR on failure, + * GNUNET_NO on timeout/queue drop + * GNUNET_YES on success * @param msg NULL on success, otherwise an error message */ typedef void (*GNUNET_DATASTORE_ContinuationWithStatus)(void *cls, @@ -96,18 +98,23 @@ typedef void (*GNUNET_DATASTORE_ContinuationWithStatus)(void *cls, * @param h handle to the datastore * @param amount how much space (in bytes) should be reserved (for content only) * @param entries how many entries will be created (to calculate per-entry overhead) + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response (or before dying in queue) * @param cont continuation to call when done; "success" will be set to * a positive reservation value if space could be reserved. * @param cont_cls closure for cont - * @param timeout how long to wait at most for a response */ void GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, uint32_t entries, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout); + void *cont_cls); /** @@ -125,6 +132,9 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, * @param priority priority of the content * @param anonymity anonymity-level for the content * @param expiration expiration time for the content + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) * @param timeout timeout for the operation * @param cont continuation to call when done * @param cont_cls closure for cont @@ -139,6 +149,8 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t priority, uint32_t anonymity, struct GNUNET_TIME_Absolute expiration, + unsigned int queue_priority, + unsigned int max_queue_size, struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls); @@ -152,16 +164,24 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, * @param h handle to the datastore * @param rid reservation ID (value of "success" in original continuation * from the "reserve" function). + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont - * @param timeout how long to wait at most for a response */ void GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, int rid, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout); + void *cont_cls); /** @@ -171,18 +191,23 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, * @param uid identifier for the value * @param priority how much to increase the priority of the value * @param expiration new expiration value should be MAX of existing and this argument + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont - * @param timeout how long to wait at most for a response */ void GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, unsigned long long uid, uint32_t priority, struct GNUNET_TIME_Absolute expiration, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout); + void *cont_cls); /** @@ -220,18 +245,23 @@ typedef void (*GNUNET_DATASTORE_Iterator) (void *cls, * @param h handle to the datastore * @param key maybe NULL (to match all entries) * @param type desired type, 0 for any + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response * @param iter function to call on each matching value; * will be called once with a NULL value at the end * @param iter_cls closure for iter - * @param timeout how long to wait at most for a response */ void GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, const GNUNET_HashCode * key, enum GNUNET_BLOCK_Type type, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_Iterator iter, - void *iter_cls, - struct GNUNET_TIME_Relative timeout); + void *iter_cls); /** @@ -251,16 +281,22 @@ GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h, * Get a random value from the datastore. * * @param h handle to the datastore + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response * @param iter function to call on a random value; it * will be called once with a value (if available) * and always once with a value of NULL. * @param iter_cls closure for iter - * @param timeout how long to wait at most for a response */ void GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, - GNUNET_DATASTORE_Iterator iter, void *iter_cls, - struct GNUNET_TIME_Relative timeout); + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, + GNUNET_DATASTORE_Iterator iter, + void *iter_cls); /** @@ -274,17 +310,23 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, * @param key key for the value * @param size number of bytes in data * @param data content stored + * @param queue_priority ranking of this request in the priority queue + * @param max_queue_size at what queue size should this request be dropped + * (if other requests of higher priority are in the queue) + * @param timeout how long to wait at most for a response * @param cont continuation to call when done * @param cont_cls closure for cont - * @param timeout how long to wait at most for a response */ void GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, const GNUNET_HashCode *key, - uint32_t size, const void *data, + uint32_t size, + const void *data, + unsigned int queue_priority, + unsigned int max_queue_size, + struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, - void *cont_cls, - struct GNUNET_TIME_Relative timeout); + void *cont_cls); #if 0 /* keep Emacsens' auto-indent happy */ -- cgit v1.2.3