aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/plugin_datastore_postgres.c
diff options
context:
space:
mode:
authorDavid Barksdale <amatus@amat.us>2017-03-22 22:17:05 -0500
committerDavid Barksdale <amatus@amat.us>2017-03-22 22:19:13 -0500
commit78ecfccd774a77ae3d7a51e3f5c7c7c86cf7985b (patch)
tree1dc23a2f6d78c8026e69181ac90055929d79bba8 /src/datastore/plugin_datastore_postgres.c
parentaa98f144e6db0da5a0a4cad83fe64a80bbab6692 (diff)
downloadgnunet-78ecfccd774a77ae3d7a51e3f5c7c7c86cf7985b.tar.gz
gnunet-78ecfccd774a77ae3d7a51e3f5c7c7c86cf7985b.zip
[datastore] Return and update replication
This fixes a couple FIXMEs in the datastore code. The replication value is now returned from the datastore and the update function can increase the replication.
Diffstat (limited to 'src/datastore/plugin_datastore_postgres.c')
-rw-r--r--src/datastore/plugin_datastore_postgres.c112
1 files changed, 61 insertions, 51 deletions
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c
index 0376ebb6c..87a7acbdc 100644
--- a/src/datastore/plugin_datastore_postgres.c
+++ b/src/datastore/plugin_datastore_postgres.c
@@ -176,9 +176,10 @@ init_connection (struct Plugin *plugin)
176 return GNUNET_SYSERR; 176 return GNUNET_SYSERR;
177 } 177 }
178 PQclear (ret); 178 PQclear (ret);
179#define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid"
179 if ((GNUNET_OK != 180 if ((GNUNET_OK !=
180 GNUNET_POSTGRES_prepare (plugin->dbh, "get", 181 GNUNET_POSTGRES_prepare (plugin->dbh, "get",
181 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 182 "SELECT " RESULT_COLUMNS " FROM gn090 "
182 "WHERE oid >= $1::bigint AND " 183 "WHERE oid >= $1::bigint AND "
183 "(rvalue >= $2 OR 0 = $3::smallint) AND " 184 "(rvalue >= $2 OR 0 = $3::smallint) AND "
184 "(hash = $4 OR 0 = $5::smallint) AND " 185 "(hash = $4 OR 0 = $5::smallint) AND "
@@ -191,28 +192,33 @@ init_connection (struct Plugin *plugin)
191 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", 9)) || 192 "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", 9)) ||
192 (GNUNET_OK != 193 (GNUNET_OK !=
193 GNUNET_POSTGRES_prepare (plugin->dbh, "update", 194 GNUNET_POSTGRES_prepare (plugin->dbh, "update",
194 "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END " 195 "UPDATE gn090 "
195 "WHERE oid = $3", 3)) || 196 "SET prio = prio + $1, "
197 "repl = repl + $2, "
198 "expire = CASE WHEN expire < $3 THEN $3 ELSE expire END "
199 "WHERE oid = $4", 4)) ||
196 (GNUNET_OK != 200 (GNUNET_OK !=
197 GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl", 201 GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl",
198 "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) " 202 "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
199 "WHERE oid = $1", 1)) || 203 "WHERE oid = $1", 1)) ||
200 (GNUNET_OK != 204 (GNUNET_OK !=
201 GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous", 205 GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous",
202 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 206 "SELECT " RESULT_COLUMNS " FROM gn090 "
203 "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint " 207 "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint "
204 "ORDER BY oid ASC LIMIT 1", 208 "ORDER BY oid ASC LIMIT 1",
205 2)) || 209 2)) ||
206 (GNUNET_OK != 210 (GNUNET_OK !=
207 GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order", 211 GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order",
208 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 212 "(SELECT " RESULT_COLUMNS " FROM gn090 "
209 "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION " 213 "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
210 "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 214 "UNION "
211 "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1", 215 "(SELECT " RESULT_COLUMNS " FROM gn090 "
216 "ORDER BY prio ASC LIMIT 1) "
217 "ORDER BY expire ASC LIMIT 1",
212 1)) || 218 1)) ||
213 (GNUNET_OK != 219 (GNUNET_OK !=
214 GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order", 220 GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order",
215 "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " 221 "SELECT " RESULT_COLUMNS " FROM gn090 "
216 "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) || 222 "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) ||
217 (GNUNET_OK != 223 (GNUNET_OK !=
218 GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) || 224 GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) ||
@@ -371,19 +377,21 @@ process_result (struct Plugin *plugin,
371 uint32_t rowid; 377 uint32_t rowid;
372 uint32_t utype; 378 uint32_t utype;
373 uint32_t anonymity; 379 uint32_t anonymity;
380 uint32_t replication;
374 uint32_t priority; 381 uint32_t priority;
375 size_t size; 382 size_t size;
376 void *data; 383 void *data;
377 struct GNUNET_TIME_Absolute expiration_time; 384 struct GNUNET_TIME_Absolute expiration_time;
378 struct GNUNET_HashCode key; 385 struct GNUNET_HashCode key;
379 struct GNUNET_PQ_ResultSpec rs[] = { 386 struct GNUNET_PQ_ResultSpec rs[] = {
387 GNUNET_PQ_result_spec_uint32 ("repl", &replication),
380 GNUNET_PQ_result_spec_uint32 ("type", &utype), 388 GNUNET_PQ_result_spec_uint32 ("type", &utype),
381 GNUNET_PQ_result_spec_uint32 ("prio", &priority), 389 GNUNET_PQ_result_spec_uint32 ("prio", &priority),
382 GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity), 390 GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity),
383 GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
384 GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time), 391 GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time),
385 GNUNET_PQ_result_spec_auto_from_type ("hash", &key), 392 GNUNET_PQ_result_spec_auto_from_type ("hash", &key),
386 GNUNET_PQ_result_spec_variable_size ("value", &data, &size), 393 GNUNET_PQ_result_spec_variable_size ("value", &data, &size),
394 GNUNET_PQ_result_spec_uint32 ("oid", &rowid),
387 GNUNET_PQ_result_spec_end 395 GNUNET_PQ_result_spec_end
388 }; 396 };
389 397
@@ -398,8 +406,7 @@ process_result (struct Plugin *plugin,
398 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 406 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
399 "datastore-postgres", 407 "datastore-postgres",
400 "Ending iteration (postgres error)\n"); 408 "Ending iteration (postgres error)\n");
401 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 409 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
402 GNUNET_TIME_UNIT_ZERO_ABS, 0);
403 return; 410 return;
404 } 411 }
405 412
@@ -409,16 +416,14 @@ process_result (struct Plugin *plugin,
409 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, 416 GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
410 "datastore-postgres", 417 "datastore-postgres",
411 "Ending iteration (no more results)\n"); 418 "Ending iteration (no more results)\n");
412 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 419 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
413 GNUNET_TIME_UNIT_ZERO_ABS, 0);
414 PQclear (res); 420 PQclear (res);
415 return; 421 return;
416 } 422 }
417 if (1 != PQntuples (res)) 423 if (1 != PQntuples (res))
418 { 424 {
419 GNUNET_break (0); 425 GNUNET_break (0);
420 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 426 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
421 GNUNET_TIME_UNIT_ZERO_ABS, 0);
422 PQclear (res); 427 PQclear (res);
423 return; 428 return;
424 } 429 }
@@ -432,8 +437,7 @@ process_result (struct Plugin *plugin,
432 GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, 437 GNUNET_POSTGRES_delete_by_rowid (plugin->dbh,
433 "delrow", 438 "delrow",
434 rowid); 439 rowid);
435 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 440 proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
436 GNUNET_TIME_UNIT_ZERO_ABS, 0);
437 return; 441 return;
438 } 442 }
439 443
@@ -443,14 +447,15 @@ process_result (struct Plugin *plugin,
443 (unsigned int) size, 447 (unsigned int) size,
444 (unsigned int) utype); 448 (unsigned int) utype);
445 iret = proc (proc_cls, 449 iret = proc (proc_cls,
446 &key, 450 &key,
447 size, 451 size,
448 data, 452 data,
449 (enum GNUNET_BLOCK_Type) utype, 453 (enum GNUNET_BLOCK_Type) utype,
450 priority, 454 priority,
451 anonymity, 455 anonymity,
452 expiration_time, 456 replication,
453 rowid); 457 expiration_time,
458 rowid);
454 PQclear (res); 459 PQclear (res);
455 if (iret == GNUNET_NO) 460 if (iret == GNUNET_NO)
456 { 461 {
@@ -620,6 +625,7 @@ struct ReplCtx
620 * @param type type of the content 625 * @param type type of the content
621 * @param priority priority of the content 626 * @param priority priority of the content
622 * @param anonymity anonymity-level for the content 627 * @param anonymity anonymity-level for the content
628 * @param replication replication-level for the content
623 * @param expiration expiration time for the content 629 * @param expiration expiration time for the content
624 * @param uid unique identifier for the datum; 630 * @param uid unique identifier for the datum;
625 * maybe 0 if no unique identifier is available 631 * maybe 0 if no unique identifier is available
@@ -630,13 +636,14 @@ struct ReplCtx
630 */ 636 */
631static int 637static int
632repl_proc (void *cls, 638repl_proc (void *cls,
633 const struct GNUNET_HashCode *key, 639 const struct GNUNET_HashCode *key,
634 uint32_t size, 640 uint32_t size,
635 const void *data, 641 const void *data,
636 enum GNUNET_BLOCK_Type type, 642 enum GNUNET_BLOCK_Type type,
637 uint32_t priority, 643 uint32_t priority,
638 uint32_t anonymity, 644 uint32_t anonymity,
639 struct GNUNET_TIME_Absolute expiration, 645 uint32_t replication,
646 struct GNUNET_TIME_Absolute expiration,
640 uint64_t uid) 647 uint64_t uid)
641{ 648{
642 struct ReplCtx *rc = cls; 649 struct ReplCtx *rc = cls;
@@ -650,12 +657,15 @@ repl_proc (void *cls,
650 PGresult *qret; 657 PGresult *qret;
651 658
652 ret = rc->proc (rc->proc_cls, 659 ret = rc->proc (rc->proc_cls,
653 key, 660 key,
654 size, data, 661 size,
655 type, 662 data,
656 priority, 663 type,
657 anonymity, 664 priority,
658 expiration, uid); 665 anonymity,
666 replication,
667 expiration,
668 uid);
659 if (NULL == key) 669 if (NULL == key)
660 return ret; 670 return ret;
661 qret = GNUNET_PQ_exec_prepared (plugin->dbh, 671 qret = GNUNET_PQ_exec_prepared (plugin->dbh,
@@ -740,19 +750,17 @@ postgres_plugin_get_expiration (void *cls,
740 750
741 751
742/** 752/**
743 * Update the priority for a particular key in the datastore. If 753 * Update the priority, replication and expiration for a particular
744 * the expiration time in value is different than the time found in 754 * unique ID in the datastore. If the expiration time in value is
745 * the datastore, the higher value should be kept. For the 755 * different than the time found in the datastore, the higher value
746 * anonymity level, the lower value is to be used. The specified 756 * should be kept. The specified priority and replication is added
747 * priority should be added to the existing priority, ignoring the 757 * to the existing value.
748 * priority in value.
749 *
750 * Note that it is possible for multiple values to match this put.
751 * In that case, all of the respective values are updated.
752 * 758 *
753 * @param cls our `struct Plugin *` 759 * @param cls our `struct Plugin *`
754 * @param uid unique identifier of the datum 760 * @param uid unique identifier of the datum
755 * @param delta by how much should the priority 761 * @param priority by how much should the priority
762 * change?
763 * @param replication by how much should the replication
756 * change? 764 * change?
757 * @param expire new expiration time should be the 765 * @param expire new expiration time should be the
758 * MAX of any existing expiration time and 766 * MAX of any existing expiration time and
@@ -762,16 +770,18 @@ postgres_plugin_get_expiration (void *cls,
762 */ 770 */
763static void 771static void
764postgres_plugin_update (void *cls, 772postgres_plugin_update (void *cls,
765 uint64_t uid, 773 uint64_t uid,
766 uint32_t delta, 774 uint32_t priority,
775 uint32_t replication,
767 struct GNUNET_TIME_Absolute expire, 776 struct GNUNET_TIME_Absolute expire,
768 PluginUpdateCont cont, 777 PluginUpdateCont cont,
769 void *cont_cls) 778 void *cont_cls)
770{ 779{
771 struct Plugin *plugin = cls; 780 struct Plugin *plugin = cls;
772 uint32_t oid = (uint32_t) uid; 781 uint32_t oid = (uint32_t) uid;
773 struct GNUNET_PQ_QueryParam params[] = { 782 struct GNUNET_PQ_QueryParam params[] = {
774 GNUNET_PQ_query_param_uint32 (&delta), 783 GNUNET_PQ_query_param_uint32 (&priority),
784 GNUNET_PQ_query_param_uint32 (&replication),
775 GNUNET_PQ_query_param_absolute_time (&expire), 785 GNUNET_PQ_query_param_absolute_time (&expire),
776 GNUNET_PQ_query_param_uint32 (&oid), 786 GNUNET_PQ_query_param_uint32 (&oid),
777 GNUNET_PQ_query_param_end 787 GNUNET_PQ_query_param_end