aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/gnunet-service-datastore.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2009-06-16 22:08:40 +0000
committerChristian Grothoff <christian@grothoff.org>2009-06-16 22:08:40 +0000
commit55fe7180de4f26582646418073bd52bf13bb5697 (patch)
tree30296a970be4a650f1f82644ead9f9be305f5a9d /src/datastore/gnunet-service-datastore.c
parenta69e21076e7a4d6e5b6d66078cae83c0626c94aa (diff)
downloadgnunet-55fe7180de4f26582646418073bd52bf13bb5697.tar.gz
gnunet-55fe7180de4f26582646418073bd52bf13bb5697.zip
further datastore API improvements:
Diffstat (limited to 'src/datastore/gnunet-service-datastore.c')
-rw-r--r--src/datastore/gnunet-service-datastore.c346
1 files changed, 310 insertions, 36 deletions
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c
index db3524557..9a24865da 100644
--- a/src/datastore/gnunet-service-datastore.c
+++ b/src/datastore/gnunet-service-datastore.c
@@ -24,14 +24,8 @@
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * 25 *
26 * TODO: 26 * TODO:
27 * 1) transmit and transmit flow-control (when do we signal client 'success'? 27 * 1) semantics of "PUT" (plugin) if entry exists (should likely
28 * ALSO: async transmit will need to address ref-counting issues on client! 28 * be similar to "UPDATE" (need to specify in PLUGIN API!)
29 * 2) efficient "update" for client to raise priority / expiration
30 * (not possible with current datastore API, but plugin API has support!);
31 * [ maybe integrate desired priority/expiration updates directly
32 * with 'GET' request? ]
33 * 3) semantics of "PUT" (plugin) if entry exists (should likely
34 * be similar to "UPDATE" (need to specify in PLUGIN API!)
35 * 4) quota management code! 29 * 4) quota management code!
36 * 5) add bloomfilter for efficiency! 30 * 5) add bloomfilter for efficiency!
37 */ 31 */
@@ -42,6 +36,11 @@
42#include "plugin_datastore.h" 36#include "plugin_datastore.h"
43#include "datastore.h" 37#include "datastore.h"
44 38
39/**
40 * How many messages do we queue at most per client?
41 */
42#define MAX_PENDING 1024
43
45 44
46/** 45/**
47 * Our datastore plugin. 46 * Our datastore plugin.
@@ -75,19 +74,182 @@ struct DatastorePlugin
75 74
76 75
77/** 76/**
77 * Linked list of active reservations.
78 */
79struct ReservationList
80{
81
82 /**
83 * This is a linked list.
84 */
85 struct ReservationList *next;
86
87 /**
88 * Client that made the reservation.
89 */
90 struct GNUNET_SERVER_Client *client;
91
92 /**
93 * Number of bytes (still) reserved.
94 */
95 uint64_t size;
96
97 /**
98 * Number of items (still) reserved.
99 */
100 uint64_t items;
101
102 /**
103 * Reservation identifier.
104 */
105 int32_t rid;
106
107};
108
109
110/**
78 * Our datastore plugin (NULL if not available). 111 * Our datastore plugin (NULL if not available).
79 */ 112 */
80static struct DatastorePlugin *plugin; 113static struct DatastorePlugin *plugin;
81 114
115/**
116 * Linked list of space reservations made by clients.
117 */
118static struct ReservationList *reservations;
119
120/**
121 * Static counter to produce reservation identifiers.
122 */
123static int reservation_gen;
124
125/**
126 * How much space are we allowed to use?
127 */
128static unsigned long long quota;
129
130
131/**
132 * Function called once the transmit operation has
133 * either failed or succeeded.
134 *
135 * @param cls closure
136 * @param status GNUNET_OK on success, GNUNET_SYSERR on error
137 */
138typedef void (*TransmitContinuation)(void *cls,
139 int status);
140
141struct TransmitCallbackContext
142{
143 /**
144 * The message that we're asked to transmit.
145 */
146 struct GNUNET_MessageHeader *msg;
147
148 /**
149 * Client that we are transmitting to.
150 */
151 struct GNUNET_SERVER_Client *client;
152
153 /**
154 * Function to call once msg has been transmitted
155 * (or at least added to the buffer).
156 */
157 TransmitContinuation tc;
158
159 /**
160 * Closure for tc.
161 */
162 void *tc_cls;
163
164 /**
165 * GNUNET_YES if we are supposed to signal the server
166 * completion of the client's request.
167 */
168 int end;
169};
170
171
172/**
173 * Function called to notify a client about the socket
174 * begin ready to queue more data. "buf" will be
175 * NULL and "size" zero if the socket was closed for
176 * writing in the meantime.
177 *
178 * @param cls closure
179 * @param size number of bytes available in buf
180 * @param buf where the callee should write the message
181 * @return number of bytes written to buf
182 */
183static size_t
184transmit_callback (void *cls,
185 size_t size, void *buf)
186{
187 struct TransmitCallbackContext *tcc = cls;
188 size_t msize;
189
190 msize = ntohs(tcc->msg->size);
191 if (size == 0)
192 {
193 if (tcc->tc != NULL)
194 tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
195 if (GNUNET_YES == tcc->end)
196 GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);
197 GNUNET_free (tcc->msg);
198 GNUNET_free (tcc);
199 return 0;
200 }
201 GNUNET_assert (size >= msize);
202 memcpy (buf, tcc->msg, msize);
203 if (tcc->tc != NULL)
204 tcc->tc (tcc->tc_cls, GNUNET_OK);
205 if (GNUNET_YES == tcc->end)
206 GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
207 GNUNET_free (tcc->msg);
208 GNUNET_free (tcc);
209 return msize;
210}
211
82 212
83/** 213/**
84 * Transmit the given message to the client. 214 * Transmit the given message to the client.
215 *
216 * @param client target of the message
217 * @param msg message to transmit, will be freed!
218 * @param end is this the last response (and we should
219 * signal the server completion accodingly after
220 * transmitting this message)?
85 */ 221 */
86static void 222static void
87transmit (struct GNUNET_SERVER_Client *client, 223transmit (struct GNUNET_SERVER_Client *client,
88 const struct GNUNET_MessageHeader *msg) 224 struct GNUNET_MessageHeader *msg,
225 TransmitContinuation tc,
226 void *tc_cls,
227 int end)
89{ 228{
90 /* FIXME! */ 229 struct TransmitCallbackContext *tcc;
230
231 tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
232 tcc->msg = msg;
233 tcc->client = client;
234 tcc->tc = tc;
235 tcc->tc_cls = tc_cls;
236 tcc->end = end;
237
238 if (NULL ==
239 GNUNET_SERVER_notify_transmit_ready (client,
240 ntohs(msg->size),
241 GNUNET_TIME_UNIT_FOREVER_REL,
242 &transmit_callback,
243 tcc))
244 {
245 GNUNET_break (0);
246 if (GNUNET_YES == end)
247 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
248 if (NULL != tc)
249 tc (tc_cls, GNUNET_SYSERR);
250 GNUNET_free (msg);
251 GNUNET_free (tcc);
252 }
91} 253}
92 254
93 255
@@ -112,8 +274,29 @@ transmit_status (struct GNUNET_SERVER_Client *client,
112 sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS); 274 sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
113 sm->status = htonl(code); 275 sm->status = htonl(code);
114 memcpy (&sm[1], msg, slen); 276 memcpy (&sm[1], msg, slen);
115 transmit (client, &sm->header); 277 transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
116 GNUNET_free (sm); 278}
279
280
281/**
282 * Function called once the transmit operation has
283 * either failed or succeeded.
284 *
285 * @param cls closure
286 * @param status GNUNET_OK on success, GNUNET_SYSERR on error
287 */
288static void
289get_next(void *next_cls,
290 int status)
291{
292 if (status != GNUNET_OK)
293 {
294 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
295 _("Failed to transmit an item to the client; aborting iteration.\n"));
296 plugin->api->next_request (next_cls, GNUNET_YES);
297 return;
298 }
299 plugin->api->next_request (next_cls, GNUNET_NO);
117} 300}
118 301
119 302
@@ -122,6 +305,7 @@ transmit_status (struct GNUNET_SERVER_Client *client,
122 * to the client. 305 * to the client.
123 * 306 *
124 * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client). 307 * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
308 * @param next_cls closure to use to ask for the next item
125 * @param key key for the content 309 * @param key key for the content
126 * @param size number of bytes in data 310 * @param size number of bytes in data
127 * @param data content stored 311 * @param data content stored
@@ -137,6 +321,7 @@ transmit_status (struct GNUNET_SERVER_Client *client,
137 */ 321 */
138static int 322static int
139transmit_item (void *cls, 323transmit_item (void *cls,
324 void *next_cls,
140 const GNUNET_HashCode * key, 325 const GNUNET_HashCode * key,
141 uint32_t size, 326 uint32_t size,
142 const void *data, 327 const void *data,
@@ -147,15 +332,17 @@ transmit_item (void *cls,
147 expiration, unsigned long long uid) 332 expiration, unsigned long long uid)
148{ 333{
149 struct GNUNET_SERVER_Client *client = cls; 334 struct GNUNET_SERVER_Client *client = cls;
150 struct GNUNET_MessageHeader end; 335 struct GNUNET_MessageHeader *end;
151 struct DataMessage *dm; 336 struct DataMessage *dm;
152 337
153 if (key == NULL) 338 if (key == NULL)
154 { 339 {
155 /* transmit 'DATA_END' */ 340 /* transmit 'DATA_END' */
156 end.size = htons(sizeof(struct GNUNET_MessageHeader)); 341 end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
157 end.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END); 342 end->size = htons(sizeof(struct GNUNET_MessageHeader));
158 transmit (client, &end); 343 end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
344 transmit (client, end, NULL, NULL, GNUNET_YES);
345 GNUNET_SERVER_client_drop (client);
159 return GNUNET_OK; 346 return GNUNET_OK;
160 } 347 }
161 dm = GNUNET_malloc (sizeof(struct DataMessage) + size); 348 dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
@@ -170,8 +357,7 @@ transmit_item (void *cls,
170 dm->uid = GNUNET_htonll(uid); 357 dm->uid = GNUNET_htonll(uid);
171 dm->key = *key; 358 dm->key = *key;
172 memcpy (&dm[1], data, size); 359 memcpy (&dm[1], data, size);
173 transmit (client, &dm->header); 360 transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
174 GNUNET_free (dm);
175 return GNUNET_OK; 361 return GNUNET_OK;
176} 362}
177 363
@@ -188,8 +374,20 @@ handle_reserve (void *cls,
188 struct GNUNET_SERVER_Client *client, 374 struct GNUNET_SERVER_Client *client,
189 const struct GNUNET_MessageHeader *message) 375 const struct GNUNET_MessageHeader *message)
190{ 376{
191 transmit_status (client, GNUNET_SYSERR, "not implemented"); 377 const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
192 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 378 struct ReservationList *e;
379
380 /* FIXME: check if we have that much space... */
381 e = GNUNET_malloc (sizeof(struct ReservationList));
382 e->next = reservations;
383 reservations = e;
384 e->client = client;
385 e->size = GNUNET_ntohll(msg->size);
386 e->items = GNUNET_ntohll(msg->items);
387 e->rid = ++reservation_gen;
388 if (reservation_gen < 0)
389 reservation_gen = 0; /* wrap around */
390 transmit_status (client, e->rid, NULL);
193} 391}
194 392
195 393
@@ -205,8 +403,32 @@ handle_release_reserve (void *cls,
205 struct GNUNET_SERVER_Client *client, 403 struct GNUNET_SERVER_Client *client,
206 const struct GNUNET_MessageHeader *message) 404 const struct GNUNET_MessageHeader *message)
207{ 405{
208 transmit_status (client, GNUNET_SYSERR, "not implemented"); 406 const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
209 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 407 struct ReservationList *pos;
408 struct ReservationList *prev;
409 struct ReservationList *next;
410
411 int rid = ntohl(msg->rid);
412 next = reservations;
413 prev = NULL;
414 while (NULL != (pos = next))
415 {
416 next = pos->next;
417 if (rid == pos->rid)
418 {
419 if (prev == NULL)
420 reservations = next;
421 else
422 prev->next = next;
423 /* FIXME: released remaining reserved space! */
424 GNUNET_free (pos);
425 transmit_status (client, GNUNET_OK, NULL);
426 return;
427 }
428 prev = pos;
429 pos = next;
430 }
431 transmit_status (client, GNUNET_SYSERR, "Could not find matching reservation");
210} 432}
211 433
212 434
@@ -284,7 +506,6 @@ handle_put (void *cls,
284 &msg); 506 &msg);
285 transmit_status (client, ret, msg); 507 transmit_status (client, ret, msg);
286 GNUNET_free_non_null (msg); 508 GNUNET_free_non_null (msg);
287 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
288} 509}
289 510
290 511
@@ -312,13 +533,13 @@ handle_get (void *cls,
312 return; 533 return;
313 } 534 }
314 msg = (const struct GetMessage*) message; 535 msg = (const struct GetMessage*) message;
536 GNUNET_SERVER_client_drop (client);
315 plugin->api->get (plugin->api->cls, 537 plugin->api->get (plugin->api->cls,
316 ((size == sizeof(struct GetMessage)) ? &msg->key : NULL), 538 ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
317 NULL, 539 NULL,
318 ntohl(msg->type), 540 ntohl(msg->type),
319 &transmit_item, 541 &transmit_item,
320 client); 542 client);
321 GNUNET_SERVER_receive_done (client, GNUNET_OK);
322} 543}
323 544
324 545
@@ -347,7 +568,6 @@ handle_update (void *cls,
347 &emsg); 568 &emsg);
348 transmit_status (client, ret, emsg); 569 transmit_status (client, ret, emsg);
349 GNUNET_free_non_null (emsg); 570 GNUNET_free_non_null (emsg);
350 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
351} 571}
352 572
353 573
@@ -363,20 +583,38 @@ handle_get_random (void *cls,
363 struct GNUNET_SERVER_Client *client, 583 struct GNUNET_SERVER_Client *client,
364 const struct GNUNET_MessageHeader *message) 584 const struct GNUNET_MessageHeader *message)
365{ 585{
586 GNUNET_SERVER_client_drop (client);
366 plugin->api->iter_migration_order (plugin->api->cls, 587 plugin->api->iter_migration_order (plugin->api->cls,
367 0, 588 0,
368 &transmit_item, 589 &transmit_item,
369 client); 590 client);
370 GNUNET_SERVER_receive_done (client, GNUNET_OK);
371} 591}
372 592
373 593
374/** 594/**
595 * Context for the 'remove_callback'.
596 */
597struct RemoveContext
598{
599 /**
600 * Client for whom we're doing the remvoing.
601 */
602 struct GNUNET_SERVER_Client *client;
603
604 /**
605 * GNUNET_YES if we managed to remove something.
606 */
607 int found;
608};
609
610
611/**
375 * Callback function that will cause the item that is passed 612 * Callback function that will cause the item that is passed
376 * in to be deleted (by returning GNUNET_NO). 613 * in to be deleted (by returning GNUNET_NO).
377 */ 614 */
378static int 615static int
379remove_callback (void *cls, 616remove_callback (void *cls,
617 void *next_cls,
380 const GNUNET_HashCode * key, 618 const GNUNET_HashCode * key,
381 uint32_t size, 619 uint32_t size,
382 const void *data, 620 const void *data,
@@ -386,8 +624,19 @@ remove_callback (void *cls,
386 struct GNUNET_TIME_Absolute 624 struct GNUNET_TIME_Absolute
387 expiration, unsigned long long uid) 625 expiration, unsigned long long uid)
388{ 626{
389 int *found = cls; 627 struct RemoveContext *rc = cls;
390 *found = GNUNET_YES; 628 if (key == NULL)
629 {
630 if (GNUNET_YES == rc->found)
631 transmit_status (rc->client, GNUNET_OK, NULL);
632 else
633 transmit_status (rc->client, GNUNET_SYSERR, _("Content not found"));
634 GNUNET_SERVER_client_drop (rc->client);
635 GNUNET_free (rc);
636 return GNUNET_OK; /* last item */
637 }
638 rc->found = GNUNET_YES;
639 plugin->api->next_request (next_cls, GNUNET_YES);
391 return GNUNET_NO; 640 return GNUNET_NO;
392} 641}
393 642
@@ -406,7 +655,7 @@ handle_remove (void *cls,
406{ 655{
407 const struct DataMessage *dm = check_data (message); 656 const struct DataMessage *dm = check_data (message);
408 GNUNET_HashCode vhash; 657 GNUNET_HashCode vhash;
409 int found; 658 struct RemoveContext *rc;
410 659
411 if (dm == NULL) 660 if (dm == NULL)
412 { 661 {
@@ -414,7 +663,9 @@ handle_remove (void *cls,
414 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 663 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
415 return; 664 return;
416 } 665 }
417 found = GNUNET_NO; 666 rc = GNUNET_malloc (sizeof(struct RemoveContext));
667 GNUNET_SERVER_client_keep (client);
668 rc->client = client;
418 GNUNET_CRYPTO_hash (&dm[1], 669 GNUNET_CRYPTO_hash (&dm[1],
419 ntohl(dm->size), 670 ntohl(dm->size),
420 &vhash); 671 &vhash);
@@ -423,12 +674,7 @@ handle_remove (void *cls,
423 &vhash, 674 &vhash,
424 ntohl(dm->type), 675 ntohl(dm->type),
425 &remove_callback, 676 &remove_callback,
426 &found); 677 rc);
427 if (GNUNET_YES == found)
428 transmit_status (client, GNUNET_OK, NULL);
429 else
430 transmit_status (client, GNUNET_SYSERR, _("Content not found"));
431 GNUNET_SERVER_receive_done (client, GNUNET_OK);
432} 678}
433 679
434 680
@@ -549,6 +795,23 @@ cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
549 795
550 796
551/** 797/**
798 * Function that removes all active reservations made
799 * by the given client and releases the space for other
800 * requests.
801 *
802 * @param cls closure
803 * @param client identification of the client
804 */
805static void
806cleanup_reservations (void *cls,
807 struct GNUNET_SERVER_Client
808 * client)
809{
810 /* FIXME */
811}
812
813
814/**
552 * Process datastore requests. 815 * Process datastore requests.
553 * 816 *
554 * @param cls closure 817 * @param cls closure
@@ -562,9 +825,20 @@ run (void *cls,
562 struct GNUNET_SERVER_Handle *server, 825 struct GNUNET_SERVER_Handle *server,
563 struct GNUNET_CONFIGURATION_Handle *cfg) 826 struct GNUNET_CONFIGURATION_Handle *cfg)
564{ 827{
828 if (GNUNET_OK !=
829 GNUNET_CONFIGURATION_get_value_number (cfg,
830 "DATASTORE", "QUOTA", &quota))
831 {
832 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
833 _("No `%s' specified for `%s' in configuration!\n"),
834 "QUOTA",
835 "DATASTORE");
836 return;
837 }
565 plugin = load_plugin (cfg, sched); 838 plugin = load_plugin (cfg, sched);
566 if (NULL == plugin) 839 if (NULL == plugin)
567 return; 840 return;
841 GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
568 GNUNET_SERVER_add_handlers (server, handlers); 842 GNUNET_SERVER_add_handlers (server, handlers);
569 GNUNET_SCHEDULER_add_delayed (sched, 843 GNUNET_SCHEDULER_add_delayed (sched,
570 GNUNET_YES, 844 GNUNET_YES,