aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/datastore_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2009-07-18 21:38:14 +0000
committerChristian Grothoff <christian@grothoff.org>2009-07-18 21:38:14 +0000
commitb103d434085de5070880f3bcb025cda5fff6836d (patch)
tree789bf45f4ea9d5bd560bd76330f42dc0259b6b04 /src/datastore/datastore_api.c
parent5ee114a710348292e44f1f7865095381df3166fc (diff)
downloadgnunet-b103d434085de5070880f3bcb025cda5fff6836d.tar.gz
gnunet-b103d434085de5070880f3bcb025cda5fff6836d.zip
work on datastore API implementation
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r--src/datastore/datastore_api.c313
1 files changed, 225 insertions, 88 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index 5e87affaf..951cf2651 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -28,64 +28,51 @@
28#include "gnunet_datastore_service.h" 28#include "gnunet_datastore_service.h"
29#include "datastore.h" 29#include "datastore.h"
30 30
31
32/** 31/**
33 * 32 * Handle to the datastore service. Followed
33 * by 65536 bytes used for storing messages.
34 */ 34 */
35struct MessageQueue 35struct GNUNET_DATASTORE_Handle
36{ 36{
37 /**
38 * This is a linked list.
39 */
40 struct MessageQueue *next;
41 37
42 /** 38 /**
43 * Message we will transmit (allocated at the end 39 * Our configuration.
44 * of this struct; do not free!).
45 */ 40 */
46 struct GNUNET_MessageHeader *msg; 41 struct GNUNET_CONFIGURATION_Handle *cfg;
47 42
48 /** 43 /**
49 * Function to call on the response. 44 * Our scheduler.
50 */ 45 */
51 GNUNET_CLIENT_MessageHandler response_processor; 46 struct GNUNET_SCHEDULER_Handle *sched;
52
53 /**
54 * Closure for response_processor.
55 */
56 void *response_processor_cls;
57
58};
59
60
61/**
62 * Handle to the datastore service.
63 */
64struct GNUNET_DATASTORE_Handle
65{
66 47
67 /** 48 /**
68 * Current connection to the datastore service. 49 * Current connection to the datastore service.
69 */ 50 */
70 struct GNUNET_CLIENT_Connection *client; 51 struct GNUNET_CLIENT_Connection *client;
71
72 /**
73 * Linked list of messages waiting to be transmitted.
74 */
75 struct MessageQueue *messages;
76 52
77 /** 53 /**
78 * Current response processor (NULL if we are not waiting 54 * Current response processor (NULL if we are not waiting for a
79 * for a response). Largely used only to know if we have 55 * response). The specific type depends on the kind of message we
80 * a 'receive' request pending. 56 * just transmitted.
81 */ 57 */
82 GNUNET_CLIENT_MessageHandler response_proc; 58 void *response_proc;
83 59
84 /** 60 /**
85 * Closure for response_proc. 61 * Closure for response_proc.
86 */ 62 */
87 void *response_proc_cls; 63 void *response_proc_cls;
88 64
65 /**
66 * Timeout for the current operation.
67 */
68 struct GNUNET_TIME_Absolute timeout;
69
70 /**
71 * Number of bytes in the message following
72 * this struct, 0 if we have no request pending.
73 */
74 size_t message_size;
75
89}; 76};
90 77
91 78
@@ -109,8 +96,11 @@ struct GNUNET_DATASTORE_Handle *GNUNET_DATASTORE_connect (struct
109 c = GNUNET_CLIENT_connect (sched, "datastore", cfg); 96 c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
110 if (c == NULL) 97 if (c == NULL)
111 return NULL; /* oops */ 98 return NULL; /* oops */
112 h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle)); 99 h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) +
100 GNUNET_SERVER_MAX_MESSAGE_SIZE);
113 h->client = c; 101 h->client = c;
102 h->cfg = cfg;
103 h->sched = sched;
114 return h; 104 return h;
115} 105}
116 106
@@ -124,7 +114,7 @@ transmit_drop (void *cls,
124{ 114{
125 struct GNUNET_DATASTORE_Handle *h = cls; 115 struct GNUNET_DATASTORE_Handle *h = cls;
126 struct GNUNET_MessageHeader *hdr; 116 struct GNUNET_MessageHeader *hdr;
127 117
128 if (buf == NULL) 118 if (buf == NULL)
129 { 119 {
130 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 120 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -151,7 +141,10 @@ transmit_drop (void *cls,
151void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, 141void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
152 int drop) 142 int drop)
153{ 143{
154 if (GNUNET_YES == drop) 144 GNUNET_assert (0 == h->message_size);
145 GNUNET_assert (NULL == h->response_proc);
146 if ( (GNUNET_YES == drop) &&
147 (h->client != NULL) )
155 { 148 {
156 if (NULL != 149 if (NULL !=
157 GNUNET_CLIENT_notify_transmit_ready (h->client, 150 GNUNET_CLIENT_notify_transmit_ready (h->client,
@@ -162,51 +155,157 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
162 return; 155 return;
163 GNUNET_break (0); 156 GNUNET_break (0);
164 } 157 }
165 GNUNET_CLIENT_disconnect (h->client); 158 if (h->client != NULL)
159 GNUNET_CLIENT_disconnect (h->client);
166 GNUNET_free (h); 160 GNUNET_free (h);
167} 161}
168 162
169 163
170/** 164/**
171 * The closure is followed by the data message. 165 * Type of a function to call when we receive a message
166 * from the service. This specific function is used
167 * to handle messages of type "struct StatusMessage".
168 *
169 * @param cls closure
170 * @param msg message received, NULL on timeout or fatal error
172 */ 171 */
173struct PutClosure 172static void
173with_status_response_handler (void *cls,
174 const struct
175 GNUNET_MessageHeader * msg)
174{ 176{
175 struct GNUNET_DATASTORE_Handle *h; 177 struct GNUNET_DATASTORE_Handle *h = cls;
176 GNUNET_DATASTORE_ContinuationWithStatus cont; 178 GNUNET_DATASTORE_ContinuationWithStatus cont = h->response_proc;
177 void *cont_cls; 179 const struct StatusMessage *sm;
178}; 180 const char *emsg;
181 int status;
182
183 if (msg == NULL)
184 {
185 h->response_proc = NULL;
186 GNUNET_CLIENT_disconnect (h->client);
187 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
188 cont (h->response_proc_cls,
189 GNUNET_SYSERR,
190 _("Timeout trying to read response from datastore service\n"));
191 return;
192 }
193 if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
194 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) )
195 {
196 GNUNET_break (0);
197 GNUNET_CLIENT_disconnect (h->client);
198 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
199 cont (h->response_proc_cls,
200 GNUNET_SYSERR,
201 _("Error reading response from datastore service\n"));
202 return;
203 }
204 sm = (const struct StatusMessage*) msg;
205 status = ntohl(sm->status);
206 emsg = NULL;
207 if (status == GNUNET_SYSERR)
208 {
209 emsg = (const char*) &sm[1];
210 if ( (ntohs(msg->size) == sizeof(struct StatusMessage)) ||
211 (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0') )
212 {
213 GNUNET_break (0);
214 emsg = _("Invalid error message received from datastore service");
215 }
216 }
217 h->response_proc = NULL;
218 cont (h->response_proc_cls,
219 status,
220 emsg);
221}
179 222
180 223
181/** 224/**
182 * Transmit PUT message to Database service. 225 * Transmit message to datastore service and then
226 * read a status message.
227 *
228 * @param cls closure with handle to datastore
229 * @param size number of bytes we can transmit at most
230 * @param buf where to write transmission, NULL on
231 * timeout
232 * @return number of bytes copied to buf
183 */ 233 */
184static size_t 234static size_t
185transmit_put (void *cls, 235transmit_get_status (void *cls,
186 size_t size, void *buf) 236 size_t size,
237 void *buf)
187{ 238{
188 struct PutClosure *pc = cls; 239 struct GNUNET_DATASTORE_Handle *h = cls;
189 struct DataMessage *dm; 240 GNUNET_DATASTORE_ContinuationWithStatus cont = h->response_proc;
190 uint16_t msize; 241 uint16_t msize;
191 242
243 h->response_proc = NULL;
192 if (buf == NULL) 244 if (buf == NULL)
193 { 245 {
194 pc->cont (pc->cont_cls, GNUNET_SYSERR, 246 h->message_size = 0;
195 gettext_noop ("Error transmitting `PUT' message to datastore service.\n")); 247 cont (h->response_proc_cls,
196 GNUNET_free (pc); 248 GNUNET_SYSERR,
249 gettext_noop ("Error transmitting message to datastore service.\n"));
197 return 0; 250 return 0;
198 } 251 }
199 dm = (struct DataMessage*) &pc[1]; 252 GNUNET_assert (h->message_size <= size);
200 msize = ntohs(dm->size); 253 memcpy (buf, &h[1], h->message_size);
201 GNUNET_assert (msize <= size); 254 h->message_size = 0;
202 memcpy (buf, dm, msize); 255 GNUNET_CLIENT_receive (h->client,
203 /* FIXME: wait for response from datastore, then 256 &with_status_response_handler,
204 call our continuation! */ 257 h,
258 GNUNET_TIME_absolute_get_remaining (h->timeout));
205 return msize; 259 return msize;
206} 260}
207 261
208 262
209/** 263/**
264 * Helper function that will initiate the
265 * transmission of a message to the datastore
266 * service. The message must already be prepared
267 * and stored in the buffer at the end of the
268 * handle. The message must be of a type that
269 * expects a "StatusMessage" in response.
270 *
271 * @param h handle to the service with prepared message
272 * @param cont function to call with result
273 * @param cont_cls closure
274 * @param timeout timeout for the operation
275 */
276static void
277transmit_for_status (struct GNUNET_DATASTORE_Handle *h,
278 GNUNET_DATASTORE_ContinuationWithStatus cont,
279 void *cont_cls,
280 struct GNUNET_TIME_Relative timeout)
281{
282 const struct GNUNET_MessageHeader *hdr;
283 uint16_t msize;
284
285 hdr = (const struct GNUNET_MessageHeader*) &h[1];
286 msize = ntohs(hdr->size);
287 GNUNET_assert (h->response_proc == NULL);
288 h->response_proc = cont;
289 h->response_proc_cls = cont_cls;
290 h->timeout = GNUNET_TIME_relative_to_absolute (timeout);
291 h->message_size = msize;
292 if (NULL == GNUNET_CLIENT_notify_transmit_ready (h->client,
293 msize,
294 timeout,
295 &transmit_get_status,
296 h))
297 {
298 GNUNET_break (0);
299 h->response_proc = NULL;
300 h->message_size = 0;
301 cont (cont_cls,
302 GNUNET_SYSERR,
303 gettext_noop ("Not ready to transmit request to datastore service"));
304 }
305}
306
307
308/**
210 * Store an item in the datastore. If the item is already present, 309 * Store an item in the datastore. If the item is already present,
211 * the priorities are summed up and the higher expiration time and 310 * the priorities are summed up and the higher expiration time and
212 * lower anonymity level is used. 311 * lower anonymity level is used.
@@ -237,18 +336,14 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
237 GNUNET_DATASTORE_ContinuationWithStatus cont, 336 GNUNET_DATASTORE_ContinuationWithStatus cont,
238 void *cont_cls) 337 void *cont_cls)
239{ 338{
240 struct PutClosure *pc;
241 struct DataMessage *dm; 339 struct DataMessage *dm;
340 size_t msize;
242 341
243 pc = GNUNET_malloc (sizeof(struct PutClosure) + 342 msize = sizeof(struct DataMessage) + size;
244 sizeof(struct DataMessage) + 343 GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
245 size); 344 dm = (struct DataMessage*) &h[1];
246 dm = (struct DataMessage*) &pc[1];
247 pc->h = h;
248 pc->cont = cont;
249 pc->cont_cls = cont_cls;
250 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT); 345 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
251 dm->header.size = htons(sizeof(struct DataMessage) + size); 346 dm->header.size = htons(msize);
252 dm->rid = htonl(rid); 347 dm->rid = htonl(rid);
253 dm->size = htonl(size); 348 dm->size = htonl(size);
254 dm->type = htonl(type); 349 dm->type = htonl(type);
@@ -258,16 +353,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
258 dm->expiration = GNUNET_TIME_absolute_hton(expiration); 353 dm->expiration = GNUNET_TIME_absolute_hton(expiration);
259 dm->key = *key; 354 dm->key = *key;
260 memcpy (&dm[1], data, size); 355 memcpy (&dm[1], data, size);
261 if (NULL == GNUNET_CLIENT_notify_transmit_ready (h->client, 356 transmit_for_status (h, cont, cont_cls, timeout);
262 sizeof(struct DataMessage) + size,
263 timeout,
264 &transmit_put,
265 pc))
266 {
267 GNUNET_break (0);
268 cont (cont_cls, GNUNET_SYSERR,
269 gettext_noop ("Not ready to transmit request to datastore service"));
270 }
271} 357}
272 358
273 359
@@ -282,15 +368,25 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
282 * @param cont continuation to call when done; "success" will be set to 368 * @param cont continuation to call when done; "success" will be set to
283 * a positive reservation value if space could be reserved. 369 * a positive reservation value if space could be reserved.
284 * @param cont_cls closure for cont 370 * @param cont_cls closure for cont
371 * @param timeout how long to wait at most for a response
285 */ 372 */
286void 373void
287GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, 374GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
288 uint64_t amount, 375 uint64_t amount,
289 uint64_t entries, 376 uint64_t entries,
290 GNUNET_DATASTORE_ContinuationWithStatus cont, 377 GNUNET_DATASTORE_ContinuationWithStatus cont,
291 void *cont_cls) 378 void *cont_cls,
379 struct GNUNET_TIME_Relative timeout)
292{ 380{
293 cont (cont_cls, GNUNET_SYSERR, "not implemented"); 381 struct ReserveMessage *rm;
382
383 rm = (struct ReserveMessage*) &h[1];
384 rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
385 rm->header.size = htons(sizeof (struct ReserveMessage));
386 rm->reserved = htonl(0);
387 rm->amount = htonl(amount);
388 rm->entries = htonl(entries);
389 transmit_for_status (h, cont, cont_cls, timeout);
294} 390}
295 391
296 392
@@ -304,14 +400,22 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
304 * from the "reserve" function). 400 * from the "reserve" function).
305 * @param cont continuation to call when done 401 * @param cont continuation to call when done
306 * @param cont_cls closure for cont 402 * @param cont_cls closure for cont
403 * @param timeout how long to wait at most for a response
307 */ 404 */
308void 405void
309GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, 406GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
310 int rid, 407 int rid,
311 GNUNET_DATASTORE_ContinuationWithStatus cont, 408 GNUNET_DATASTORE_ContinuationWithStatus cont,
312 void *cont_cls) 409 void *cont_cls,
410 struct GNUNET_TIME_Relative timeout)
313{ 411{
314 cont (cont_cls, GNUNET_OK, NULL); 412 struct ReleaseReserveMessage *rrm;
413
414 rrm = (struct ReleaseReserveMessage*) &h[1];
415 rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
416 rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
417 rrm->rid = htonl(rid);
418 transmit_for_status (h, cont, cont_cls, timeout);
315} 419}
316 420
317 421
@@ -324,6 +428,7 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
324 * @param expiration new expiration value should be MAX of existing and this argument 428 * @param expiration new expiration value should be MAX of existing and this argument
325 * @param cont continuation to call when done 429 * @param cont continuation to call when done
326 * @param cont_cls closure for cont 430 * @param cont_cls closure for cont
431 * @param timeout how long to wait at most for a response
327 */ 432 */
328void 433void
329GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, 434GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
@@ -331,9 +436,18 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
331 uint32_t priority, 436 uint32_t priority,
332 struct GNUNET_TIME_Absolute expiration, 437 struct GNUNET_TIME_Absolute expiration,
333 GNUNET_DATASTORE_ContinuationWithStatus cont, 438 GNUNET_DATASTORE_ContinuationWithStatus cont,
334 void *cont_cls) 439 void *cont_cls,
440 struct GNUNET_TIME_Relative timeout)
335{ 441{
336 cont (cont_cls, GNUNET_SYSERR, "not implemented"); 442 struct UpdateMessage *um;
443
444 um = (struct UpdateMessage*) &h[1];
445 um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
446 um->header.size = htons(sizeof (struct UpdateMessage));
447 um->priority = htonl(priority);
448 um->expiration = GNUNET_TIME_absolute_hton(expiration);
449 um->uid = GNUNET_htonll(uid);
450 transmit_for_status (h, cont, cont_cls, timeout);
337} 451}
338 452
339 453
@@ -347,12 +461,14 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
347 * @param iter function to call on each matching value; 461 * @param iter function to call on each matching value;
348 * will be called once with a NULL value at the end 462 * will be called once with a NULL value at the end
349 * @param iter_cls closure for iter 463 * @param iter_cls closure for iter
464 * @param timeout how long to wait at most for a response
350 */ 465 */
351void 466void
352GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, 467GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
353 const GNUNET_HashCode * key, 468 const GNUNET_HashCode * key,
354 uint32_t type, 469 uint32_t type,
355 GNUNET_DATASTORE_Iterator iter, void *iter_cls) 470 GNUNET_DATASTORE_Iterator iter, void *iter_cls,
471 struct GNUNET_TIME_Relative timeout)
356{ 472{
357 static struct GNUNET_TIME_Absolute zero; 473 static struct GNUNET_TIME_Absolute zero;
358 iter (iter_cls, 474 iter (iter_cls,
@@ -368,10 +484,12 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
368 * will be called exactly once; if no values 484 * will be called exactly once; if no values
369 * are available, the value will be NULL. 485 * are available, the value will be NULL.
370 * @param iter_cls closure for iter 486 * @param iter_cls closure for iter
487 * @param timeout how long to wait at most for a response
371 */ 488 */
372void 489void
373GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, 490GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
374 GNUNET_DATASTORE_Iterator iter, void *iter_cls) 491 GNUNET_DATASTORE_Iterator iter, void *iter_cls,
492 struct GNUNET_TIME_Relative timeout)
375{ 493{
376 static struct GNUNET_TIME_Absolute zero; 494 static struct GNUNET_TIME_Absolute zero;
377 495
@@ -389,15 +507,34 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
389 * @param data content stored 507 * @param data content stored
390 * @param cont continuation to call when done 508 * @param cont continuation to call when done
391 * @param cont_cls closure for cont 509 * @param cont_cls closure for cont
510 * @param timeout how long to wait at most for a response
392 */ 511 */
393void 512void
394GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, 513GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
395 const GNUNET_HashCode * key, 514 const GNUNET_HashCode * key,
396 uint32_t size, const void *data, 515 uint32_t size, const void *data,
397 GNUNET_DATASTORE_ContinuationWithStatus cont, 516 GNUNET_DATASTORE_ContinuationWithStatus cont,
398 void *cont_cls) 517 void *cont_cls,
518 struct GNUNET_TIME_Relative timeout)
399{ 519{
400 cont (cont_cls, GNUNET_SYSERR, "not implemented"); 520 struct DataMessage *dm;
521 size_t msize;
522
523 msize = sizeof(struct DataMessage) + size;
524 GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
525 dm = (struct DataMessage*) &h[1];
526 dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
527 dm->header.size = htons(msize);
528 dm->rid = htonl(0);
529 dm->size = htonl(size);
530 dm->type = htonl(0);
531 dm->priority = htonl(0);
532 dm->anonymity = htonl(0);
533 dm->uid = GNUNET_htonll(0);
534 dm->expiration.value = 0;
535 dm->key = *key;
536 memcpy (&dm[1], data, size);
537 transmit_for_status (h, cont, cont_cls, timeout);
401} 538}
402 539
403 540