aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore
diff options
context:
space:
mode:
authorDavid Barksdale <amatus.amongus@gmail.com>2014-12-15 00:32:17 +0000
committerDavid Barksdale <amatus.amongus@gmail.com>2014-12-15 00:32:17 +0000
commitbd94aa6fe80a7687c3727ebcdb3ba5185d3b8b11 (patch)
tree02c23cc57494a495c9da5f8e88e282a376ba4016 /src/peerstore
parent6c8fa85819a2b02b3c4a175a08c1779283eda209 (diff)
downloadgnunet-bd94aa6fe80a7687c3727ebcdb3ba5185d3b8b11.tar.gz
gnunet-bd94aa6fe80a7687c3727ebcdb3ba5185d3b8b11.zip
Implement asynchronous peerstore plugin API
Resolves #3506
Diffstat (limited to 'src/peerstore')
-rw-r--r--src/peerstore/gnunet-service-peerstore.c96
-rw-r--r--src/peerstore/plugin_peerstore_sqlite.c41
2 files changed, 104 insertions, 33 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c
index ed5b14eb9..f8ec631b9 100644
--- a/src/peerstore/gnunet-service-peerstore.c
+++ b/src/peerstore/gnunet-service-peerstore.c
@@ -136,6 +136,10 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
136} 136}
137 137
138 138
139/* Forward declaration */
140static void expire_records_continuation (void *cls, int success);
141
142
139/** 143/**
140 * Deletes any expired records from storage 144 * Deletes any expired records from storage
141 */ 145 */
@@ -143,14 +147,34 @@ static void
143cleanup_expired_records (void *cls, 147cleanup_expired_records (void *cls,
144 const struct GNUNET_SCHEDULER_TaskContext *tc) 148 const struct GNUNET_SCHEDULER_TaskContext *tc)
145{ 149{
146 int deleted; 150 int ret;
147 151
148 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 152 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
149 return; 153 return;
150 GNUNET_assert (NULL != db); 154 GNUNET_assert (NULL != db);
151 deleted = db->expire_records (db->cls, GNUNET_TIME_absolute_get ()); 155 ret = db->expire_records (db->cls, GNUNET_TIME_absolute_get (),
152 if (deleted > 0) 156 expire_records_continuation, NULL);
153 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted); 157 if (GNUNET_OK != ret)
158 {
159 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
160 (GNUNET_TIME_UNIT_SECONDS,
161 EXPIRED_RECORDS_CLEANUP_INTERVAL),
162 &cleanup_expired_records, NULL);
163 }
164}
165
166
167/**
168 * Continuation to expire_records called by the peerstore plugin
169 *
170 * @param cls unused
171 * @param success count of records deleted or #GNUNET_SYSERR
172 */
173static void
174expire_records_continuation(void *cls, int success)
175{
176 if (success > 0)
177 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", success);
154 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 178 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
155 (GNUNET_TIME_UNIT_SECONDS, 179 (GNUNET_TIME_UNIT_SECONDS,
156 EXPIRED_RECORDS_CLEANUP_INTERVAL), 180 EXPIRED_RECORDS_CLEANUP_INTERVAL),
@@ -217,15 +241,32 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
217static int 241static int
218record_iterator (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg) 242record_iterator (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
219{ 243{
220 struct GNUNET_SERVER_Client *client = cls; 244 struct GNUNET_PEERSTORE_Record *cls_record = cls;
221 struct StoreRecordMessage *srm; 245 struct StoreRecordMessage *srm;
222 246
247 if (NULL == record)
248 {
249 /* No more records */
250 struct GNUNET_MessageHeader *endmsg;
251
252 endmsg = GNUNET_new (struct GNUNET_MessageHeader);
253 endmsg->size = htons (sizeof (struct GNUNET_MessageHeader));
254 endmsg->type = htons (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
255 GNUNET_SERVER_notification_context_unicast (nc, cls_record->client, endmsg,
256 GNUNET_NO);
257 GNUNET_free (endmsg);
258 GNUNET_SERVER_receive_done (cls_record->client,
259 NULL == emsg ? GNUNET_OK : GNUNET_SYSERR);
260 PEERSTORE_destroy_record (cls_record);
261 return GNUNET_NO;
262 }
263
223 srm = 264 srm =
224 PEERSTORE_create_record_message (record->sub_system, record->peer, 265 PEERSTORE_create_record_message (record->sub_system, record->peer,
225 record->key, record->value, 266 record->key, record->value,
226 record->value_size, record->expiry, 267 record->value_size, record->expiry,
227 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD); 268 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
228 GNUNET_SERVER_notification_context_unicast (nc, client, 269 GNUNET_SERVER_notification_context_unicast (nc, cls_record->client,
229 (struct GNUNET_MessageHeader *) 270 (struct GNUNET_MessageHeader *)
230 srm, GNUNET_NO); 271 srm, GNUNET_NO);
231 GNUNET_free (srm); 272 GNUNET_free (srm);
@@ -334,7 +375,6 @@ handle_iterate (void *cls, struct GNUNET_SERVER_Client *client,
334 const struct GNUNET_MessageHeader *message) 375 const struct GNUNET_MessageHeader *message)
335{ 376{
336 struct GNUNET_PEERSTORE_Record *record; 377 struct GNUNET_PEERSTORE_Record *record;
337 struct GNUNET_MessageHeader *endmsg;
338 378
339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received an iterate request.\n"); 379 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received an iterate request.\n");
340 record = PEERSTORE_parse_record_message (message); 380 record = PEERSTORE_parse_record_message (message);
@@ -358,21 +398,32 @@ handle_iterate (void *cls, struct GNUNET_SERVER_Client *client,
358 (NULL == record->peer) ? "NULL" : GNUNET_i2s (record->peer), 398 (NULL == record->peer) ? "NULL" : GNUNET_i2s (record->peer),
359 (NULL == record->key) ? "NULL" : record->key); 399 (NULL == record->key) ? "NULL" : record->key);
360 GNUNET_SERVER_notification_context_add (nc, client); 400 GNUNET_SERVER_notification_context_add (nc, client);
361 if (GNUNET_OK == 401 record->client = client;
402 if (GNUNET_OK !=
362 db->iterate_records (db->cls, record->sub_system, record->peer, 403 db->iterate_records (db->cls, record->sub_system, record->peer,
363 record->key, &record_iterator, client)) 404 record->key, &record_iterator, record))
364 { 405 {
365 endmsg = GNUNET_new (struct GNUNET_MessageHeader); 406 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
366 407 PEERSTORE_destroy_record (record);
367 endmsg->size = htons (sizeof (struct GNUNET_MessageHeader));
368 endmsg->type = htons (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
369 GNUNET_SERVER_notification_context_unicast (nc, client, endmsg, GNUNET_NO);
370 GNUNET_free (endmsg);
371 GNUNET_SERVER_receive_done (client, GNUNET_OK);
372 } 408 }
373 else 409}
410
411
412/**
413 * Continuation of store_record called by the peerstore plugin
414 *
415 * @param cls closure
416 * @param success result
417 */
418static void
419store_record_continuation (void *cls, int success)
420{
421 struct GNUNET_PEERSTORE_Record *record = cls;
422
423 GNUNET_SERVER_receive_done (record->client, success);
424 if (GNUNET_OK == success)
374 { 425 {
375 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 426 watch_notifier (record);
376 } 427 }
377 PEERSTORE_destroy_record (record); 428 PEERSTORE_destroy_record (record);
378} 429}
@@ -418,20 +469,19 @@ handle_store (void *cls, struct GNUNET_SERVER_Client *client,
418 " Options: %d.\n", 469 " Options: %d.\n",
419 record->value_size, record->sub_system, GNUNET_i2s (record->peer), 470 record->value_size, record->sub_system, GNUNET_i2s (record->peer),
420 record->key, record->value_size, ntohl (srm->options)); 471 record->key, record->value_size, ntohl (srm->options));
472 record->client = client;
421 if (GNUNET_OK != 473 if (GNUNET_OK !=
422 db->store_record (db->cls, record->sub_system, record->peer, record->key, 474 db->store_record (db->cls, record->sub_system, record->peer, record->key,
423 record->value, record->value_size, *record->expiry, 475 record->value, record->value_size, *record->expiry,
424 ntohl (srm->options))) 476 ntohl (srm->options), store_record_continuation,
477 record))
425 { 478 {
426 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 479 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
427 _("Failed to store requested value, sqlite database error.")); 480 _("Failed to store requested value, database error."));
428 PEERSTORE_destroy_record (record); 481 PEERSTORE_destroy_record (record);
429 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 482 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
430 return; 483 return;
431 } 484 }
432 GNUNET_SERVER_receive_done (client, GNUNET_OK);
433 watch_notifier (record);
434 PEERSTORE_destroy_record (record);
435} 485}
436 486
437 487
diff --git a/src/peerstore/plugin_peerstore_sqlite.c b/src/peerstore/plugin_peerstore_sqlite.c
index fc644d9b7..cd402aaae 100644
--- a/src/peerstore/plugin_peerstore_sqlite.c
+++ b/src/peerstore/plugin_peerstore_sqlite.c
@@ -160,10 +160,15 @@ peerstore_sqlite_delete_records (void *cls, const char *sub_system,
160 * 160 *
161 * @param cls closure (internal context for the plugin) 161 * @param cls closure (internal context for the plugin)
162 * @param now time to use as reference 162 * @param now time to use as reference
163 * @return number of records deleted 163 * @param cont continuation called with the number of records expired
164 * @param cont_cls continuation closure
165 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and cont is not
166 * called
164 */ 167 */
165static int 168static int
166peerstore_sqlite_expire_records (void *cls, struct GNUNET_TIME_Absolute now) 169peerstore_sqlite_expire_records (void *cls, struct GNUNET_TIME_Absolute now,
170 GNUNET_PEERSTORE_Continuation cont,
171 void *cont_cls)
167{ 172{
168 struct Plugin *plugin = cls; 173 struct Plugin *plugin = cls;
169 sqlite3_stmt *stmt = plugin->expire_peerstoredata; 174 sqlite3_stmt *stmt = plugin->expire_peerstoredata;
@@ -183,9 +188,13 @@ peerstore_sqlite_expire_records (void *cls, struct GNUNET_TIME_Absolute now)
183 { 188 {
184 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 189 LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
185 "sqlite3_reset"); 190 "sqlite3_reset");
186 return 0; 191 return GNUNET_SYSERR;
187 } 192 }
188 return sqlite3_changes (plugin->dbh); 193 if (NULL != cont)
194 {
195 cont (cont_cls, sqlite3_changes (plugin->dbh));
196 }
197 return GNUNET_OK;
189} 198}
190 199
191 200
@@ -197,9 +206,11 @@ peerstore_sqlite_expire_records (void *cls, struct GNUNET_TIME_Absolute now)
197 * @param sub_system name of sub system 206 * @param sub_system name of sub system
198 * @param peer Peer identity (can be NULL) 207 * @param peer Peer identity (can be NULL)
199 * @param key entry key string (can be NULL) 208 * @param key entry key string (can be NULL)
200 * @param iter function to call with the result 209 * @param iter function to call asynchronously with the results, terminated
210 * by a NULL result
201 * @param iter_cls closure for @a iter 211 * @param iter_cls closure for @a iter
202 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error 212 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and iter is not
213 * called
203 */ 214 */
204static int 215static int
205peerstore_sqlite_iterate_records (void *cls, const char *sub_system, 216peerstore_sqlite_iterate_records (void *cls, const char *sub_system,
@@ -296,8 +307,10 @@ peerstore_sqlite_iterate_records (void *cls, const char *sub_system,
296 "sqlite3_reset"); 307 "sqlite3_reset");
297 err = 1; 308 err = 1;
298 } 309 }
299 if (err) 310 if (NULL != iter)
300 return GNUNET_SYSERR; 311 {
312 iter (iter_cls, NULL, err ? "sqlite error" : NULL);
313 }
301 return GNUNET_OK; 314 return GNUNET_OK;
302} 315}
303 316
@@ -315,14 +328,18 @@ peerstore_sqlite_iterate_records (void *cls, const char *sub_system,
315 * @param size size of value to be stored 328 * @param size size of value to be stored
316 * @param expiry absolute time after which the record is (possibly) deleted 329 * @param expiry absolute time after which the record is (possibly) deleted
317 * @param options options related to the store operation 330 * @param options options related to the store operation
318 * @return #GNUNET_OK on success, else #GNUNET_SYSERR 331 * @param cont continuation called when record is stored
332 * @param cont_cls continuation closure
333 * @return #GNUNET_OK on success, else #GNUNET_SYSERR and cont is not called
319 */ 334 */
320static int 335static int
321peerstore_sqlite_store_record (void *cls, const char *sub_system, 336peerstore_sqlite_store_record (void *cls, const char *sub_system,
322 const struct GNUNET_PeerIdentity *peer, 337 const struct GNUNET_PeerIdentity *peer,
323 const char *key, const void *value, size_t size, 338 const char *key, const void *value, size_t size,
324 struct GNUNET_TIME_Absolute expiry, 339 struct GNUNET_TIME_Absolute expiry,
325 enum GNUNET_PEERSTORE_StoreOption options) 340 enum GNUNET_PEERSTORE_StoreOption options,
341 GNUNET_PEERSTORE_Continuation cont,
342 void *cont_cls)
326{ 343{
327 struct Plugin *plugin = cls; 344 struct Plugin *plugin = cls;
328 sqlite3_stmt *stmt = plugin->insert_peerstoredata; 345 sqlite3_stmt *stmt = plugin->insert_peerstoredata;
@@ -355,6 +372,10 @@ peerstore_sqlite_store_record (void *cls, const char *sub_system,
355 "sqlite3_reset"); 372 "sqlite3_reset");
356 return GNUNET_SYSERR; 373 return GNUNET_SYSERR;
357 } 374 }
375 if (NULL != cont)
376 {
377 cont (cont_cls, GNUNET_OK);
378 }
358 return GNUNET_OK; 379 return GNUNET_OK;
359} 380}
360 381