diff options
Diffstat (limited to 'src/datastore/plugin_datastore_postgres.c')
-rw-r--r-- | src/datastore/plugin_datastore_postgres.c | 112 |
1 files changed, 61 insertions, 51 deletions
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c index 0376ebb6c..87a7acbdc 100644 --- a/src/datastore/plugin_datastore_postgres.c +++ b/src/datastore/plugin_datastore_postgres.c | |||
@@ -176,9 +176,10 @@ init_connection (struct Plugin *plugin) | |||
176 | return GNUNET_SYSERR; | 176 | return GNUNET_SYSERR; |
177 | } | 177 | } |
178 | PQclear (ret); | 178 | PQclear (ret); |
179 | #define RESULT_COLUMNS "repl, type, prio, anonLevel, expire, hash, value, oid" | ||
179 | if ((GNUNET_OK != | 180 | if ((GNUNET_OK != |
180 | GNUNET_POSTGRES_prepare (plugin->dbh, "get", | 181 | GNUNET_POSTGRES_prepare (plugin->dbh, "get", |
181 | "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " | 182 | "SELECT " RESULT_COLUMNS " FROM gn090 " |
182 | "WHERE oid >= $1::bigint AND " | 183 | "WHERE oid >= $1::bigint AND " |
183 | "(rvalue >= $2 OR 0 = $3::smallint) AND " | 184 | "(rvalue >= $2 OR 0 = $3::smallint) AND " |
184 | "(hash = $4 OR 0 = $5::smallint) AND " | 185 | "(hash = $4 OR 0 = $5::smallint) AND " |
@@ -191,28 +192,33 @@ init_connection (struct Plugin *plugin) | |||
191 | "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", 9)) || | 192 | "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", 9)) || |
192 | (GNUNET_OK != | 193 | (GNUNET_OK != |
193 | GNUNET_POSTGRES_prepare (plugin->dbh, "update", | 194 | GNUNET_POSTGRES_prepare (plugin->dbh, "update", |
194 | "UPDATE gn090 SET prio = prio + $1, expire = CASE WHEN expire < $2 THEN $2 ELSE expire END " | 195 | "UPDATE gn090 " |
195 | "WHERE oid = $3", 3)) || | 196 | "SET prio = prio + $1, " |
197 | "repl = repl + $2, " | ||
198 | "expire = CASE WHEN expire < $3 THEN $3 ELSE expire END " | ||
199 | "WHERE oid = $4", 4)) || | ||
196 | (GNUNET_OK != | 200 | (GNUNET_OK != |
197 | GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl", | 201 | GNUNET_POSTGRES_prepare (plugin->dbh, "decrepl", |
198 | "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) " | 202 | "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) " |
199 | "WHERE oid = $1", 1)) || | 203 | "WHERE oid = $1", 1)) || |
200 | (GNUNET_OK != | 204 | (GNUNET_OK != |
201 | GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous", | 205 | GNUNET_POSTGRES_prepare (plugin->dbh, "select_non_anonymous", |
202 | "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " | 206 | "SELECT " RESULT_COLUMNS " FROM gn090 " |
203 | "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint " | 207 | "WHERE anonLevel = 0 AND type = $1 AND oid >= $2::bigint " |
204 | "ORDER BY oid ASC LIMIT 1", | 208 | "ORDER BY oid ASC LIMIT 1", |
205 | 2)) || | 209 | 2)) || |
206 | (GNUNET_OK != | 210 | (GNUNET_OK != |
207 | GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order", | 211 | GNUNET_POSTGRES_prepare (plugin->dbh, "select_expiration_order", |
208 | "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " | 212 | "(SELECT " RESULT_COLUMNS " FROM gn090 " |
209 | "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " "UNION " | 213 | "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) " |
210 | "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " | 214 | "UNION " |
211 | "ORDER BY prio ASC LIMIT 1) " "ORDER BY expire ASC LIMIT 1", | 215 | "(SELECT " RESULT_COLUMNS " FROM gn090 " |
216 | "ORDER BY prio ASC LIMIT 1) " | ||
217 | "ORDER BY expire ASC LIMIT 1", | ||
212 | 1)) || | 218 | 1)) || |
213 | (GNUNET_OK != | 219 | (GNUNET_OK != |
214 | GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order", | 220 | GNUNET_POSTGRES_prepare (plugin->dbh, "select_replication_order", |
215 | "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " | 221 | "SELECT " RESULT_COLUMNS " FROM gn090 " |
216 | "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) || | 222 | "ORDER BY repl DESC,RANDOM() LIMIT 1", 0)) || |
217 | (GNUNET_OK != | 223 | (GNUNET_OK != |
218 | GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) || | 224 | GNUNET_POSTGRES_prepare (plugin->dbh, "delrow", "DELETE FROM gn090 " "WHERE oid=$1", 1)) || |
@@ -371,19 +377,21 @@ process_result (struct Plugin *plugin, | |||
371 | uint32_t rowid; | 377 | uint32_t rowid; |
372 | uint32_t utype; | 378 | uint32_t utype; |
373 | uint32_t anonymity; | 379 | uint32_t anonymity; |
380 | uint32_t replication; | ||
374 | uint32_t priority; | 381 | uint32_t priority; |
375 | size_t size; | 382 | size_t size; |
376 | void *data; | 383 | void *data; |
377 | struct GNUNET_TIME_Absolute expiration_time; | 384 | struct GNUNET_TIME_Absolute expiration_time; |
378 | struct GNUNET_HashCode key; | 385 | struct GNUNET_HashCode key; |
379 | struct GNUNET_PQ_ResultSpec rs[] = { | 386 | struct GNUNET_PQ_ResultSpec rs[] = { |
387 | GNUNET_PQ_result_spec_uint32 ("repl", &replication), | ||
380 | GNUNET_PQ_result_spec_uint32 ("type", &utype), | 388 | GNUNET_PQ_result_spec_uint32 ("type", &utype), |
381 | GNUNET_PQ_result_spec_uint32 ("prio", &priority), | 389 | GNUNET_PQ_result_spec_uint32 ("prio", &priority), |
382 | GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity), | 390 | GNUNET_PQ_result_spec_uint32 ("anonLevel", &anonymity), |
383 | GNUNET_PQ_result_spec_uint32 ("oid", &rowid), | ||
384 | GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time), | 391 | GNUNET_PQ_result_spec_absolute_time ("expire", &expiration_time), |
385 | GNUNET_PQ_result_spec_auto_from_type ("hash", &key), | 392 | GNUNET_PQ_result_spec_auto_from_type ("hash", &key), |
386 | GNUNET_PQ_result_spec_variable_size ("value", &data, &size), | 393 | GNUNET_PQ_result_spec_variable_size ("value", &data, &size), |
394 | GNUNET_PQ_result_spec_uint32 ("oid", &rowid), | ||
387 | GNUNET_PQ_result_spec_end | 395 | GNUNET_PQ_result_spec_end |
388 | }; | 396 | }; |
389 | 397 | ||
@@ -398,8 +406,7 @@ process_result (struct Plugin *plugin, | |||
398 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | 406 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, |
399 | "datastore-postgres", | 407 | "datastore-postgres", |
400 | "Ending iteration (postgres error)\n"); | 408 | "Ending iteration (postgres error)\n"); |
401 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, | 409 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); |
402 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
403 | return; | 410 | return; |
404 | } | 411 | } |
405 | 412 | ||
@@ -409,16 +416,14 @@ process_result (struct Plugin *plugin, | |||
409 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, | 416 | GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, |
410 | "datastore-postgres", | 417 | "datastore-postgres", |
411 | "Ending iteration (no more results)\n"); | 418 | "Ending iteration (no more results)\n"); |
412 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, | 419 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); |
413 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
414 | PQclear (res); | 420 | PQclear (res); |
415 | return; | 421 | return; |
416 | } | 422 | } |
417 | if (1 != PQntuples (res)) | 423 | if (1 != PQntuples (res)) |
418 | { | 424 | { |
419 | GNUNET_break (0); | 425 | GNUNET_break (0); |
420 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, | 426 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); |
421 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
422 | PQclear (res); | 427 | PQclear (res); |
423 | return; | 428 | return; |
424 | } | 429 | } |
@@ -432,8 +437,7 @@ process_result (struct Plugin *plugin, | |||
432 | GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, | 437 | GNUNET_POSTGRES_delete_by_rowid (plugin->dbh, |
433 | "delrow", | 438 | "delrow", |
434 | rowid); | 439 | rowid); |
435 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, | 440 | proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); |
436 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
437 | return; | 441 | return; |
438 | } | 442 | } |
439 | 443 | ||
@@ -443,14 +447,15 @@ process_result (struct Plugin *plugin, | |||
443 | (unsigned int) size, | 447 | (unsigned int) size, |
444 | (unsigned int) utype); | 448 | (unsigned int) utype); |
445 | iret = proc (proc_cls, | 449 | iret = proc (proc_cls, |
446 | &key, | 450 | &key, |
447 | size, | 451 | size, |
448 | data, | 452 | data, |
449 | (enum GNUNET_BLOCK_Type) utype, | 453 | (enum GNUNET_BLOCK_Type) utype, |
450 | priority, | 454 | priority, |
451 | anonymity, | 455 | anonymity, |
452 | expiration_time, | 456 | replication, |
453 | rowid); | 457 | expiration_time, |
458 | rowid); | ||
454 | PQclear (res); | 459 | PQclear (res); |
455 | if (iret == GNUNET_NO) | 460 | if (iret == GNUNET_NO) |
456 | { | 461 | { |
@@ -620,6 +625,7 @@ struct ReplCtx | |||
620 | * @param type type of the content | 625 | * @param type type of the content |
621 | * @param priority priority of the content | 626 | * @param priority priority of the content |
622 | * @param anonymity anonymity-level for the content | 627 | * @param anonymity anonymity-level for the content |
628 | * @param replication replication-level for the content | ||
623 | * @param expiration expiration time for the content | 629 | * @param expiration expiration time for the content |
624 | * @param uid unique identifier for the datum; | 630 | * @param uid unique identifier for the datum; |
625 | * maybe 0 if no unique identifier is available | 631 | * maybe 0 if no unique identifier is available |
@@ -630,13 +636,14 @@ struct ReplCtx | |||
630 | */ | 636 | */ |
631 | static int | 637 | static int |
632 | repl_proc (void *cls, | 638 | repl_proc (void *cls, |
633 | const struct GNUNET_HashCode *key, | 639 | const struct GNUNET_HashCode *key, |
634 | uint32_t size, | 640 | uint32_t size, |
635 | const void *data, | 641 | const void *data, |
636 | enum GNUNET_BLOCK_Type type, | 642 | enum GNUNET_BLOCK_Type type, |
637 | uint32_t priority, | 643 | uint32_t priority, |
638 | uint32_t anonymity, | 644 | uint32_t anonymity, |
639 | struct GNUNET_TIME_Absolute expiration, | 645 | uint32_t replication, |
646 | struct GNUNET_TIME_Absolute expiration, | ||
640 | uint64_t uid) | 647 | uint64_t uid) |
641 | { | 648 | { |
642 | struct ReplCtx *rc = cls; | 649 | struct ReplCtx *rc = cls; |
@@ -650,12 +657,15 @@ repl_proc (void *cls, | |||
650 | PGresult *qret; | 657 | PGresult *qret; |
651 | 658 | ||
652 | ret = rc->proc (rc->proc_cls, | 659 | ret = rc->proc (rc->proc_cls, |
653 | key, | 660 | key, |
654 | size, data, | 661 | size, |
655 | type, | 662 | data, |
656 | priority, | 663 | type, |
657 | anonymity, | 664 | priority, |
658 | expiration, uid); | 665 | anonymity, |
666 | replication, | ||
667 | expiration, | ||
668 | uid); | ||
659 | if (NULL == key) | 669 | if (NULL == key) |
660 | return ret; | 670 | return ret; |
661 | qret = GNUNET_PQ_exec_prepared (plugin->dbh, | 671 | qret = GNUNET_PQ_exec_prepared (plugin->dbh, |
@@ -740,19 +750,17 @@ postgres_plugin_get_expiration (void *cls, | |||
740 | 750 | ||
741 | 751 | ||
742 | /** | 752 | /** |
743 | * Update the priority for a particular key in the datastore. If | 753 | * Update the priority, replication and expiration for a particular |
744 | * the expiration time in value is different than the time found in | 754 | * unique ID in the datastore. If the expiration time in value is |
745 | * the datastore, the higher value should be kept. For the | 755 | * different than the time found in the datastore, the higher value |
746 | * anonymity level, the lower value is to be used. The specified | 756 | * should be kept. The specified priority and replication is added |
747 | * priority should be added to the existing priority, ignoring the | 757 | * to the existing value. |
748 | * priority in value. | ||
749 | * | ||
750 | * Note that it is possible for multiple values to match this put. | ||
751 | * In that case, all of the respective values are updated. | ||
752 | * | 758 | * |
753 | * @param cls our `struct Plugin *` | 759 | * @param cls our `struct Plugin *` |
754 | * @param uid unique identifier of the datum | 760 | * @param uid unique identifier of the datum |
755 | * @param delta by how much should the priority | 761 | * @param priority by how much should the priority |
762 | * change? | ||
763 | * @param replication by how much should the replication | ||
756 | * change? | 764 | * change? |
757 | * @param expire new expiration time should be the | 765 | * @param expire new expiration time should be the |
758 | * MAX of any existing expiration time and | 766 | * MAX of any existing expiration time and |
@@ -762,16 +770,18 @@ postgres_plugin_get_expiration (void *cls, | |||
762 | */ | 770 | */ |
763 | static void | 771 | static void |
764 | postgres_plugin_update (void *cls, | 772 | postgres_plugin_update (void *cls, |
765 | uint64_t uid, | 773 | uint64_t uid, |
766 | uint32_t delta, | 774 | uint32_t priority, |
775 | uint32_t replication, | ||
767 | struct GNUNET_TIME_Absolute expire, | 776 | struct GNUNET_TIME_Absolute expire, |
768 | PluginUpdateCont cont, | 777 | PluginUpdateCont cont, |
769 | void *cont_cls) | 778 | void *cont_cls) |
770 | { | 779 | { |
771 | struct Plugin *plugin = cls; | 780 | struct Plugin *plugin = cls; |
772 | uint32_t oid = (uint32_t) uid; | 781 | uint32_t oid = (uint32_t) uid; |
773 | struct GNUNET_PQ_QueryParam params[] = { | 782 | struct GNUNET_PQ_QueryParam params[] = { |
774 | GNUNET_PQ_query_param_uint32 (&delta), | 783 | GNUNET_PQ_query_param_uint32 (&priority), |
784 | GNUNET_PQ_query_param_uint32 (&replication), | ||
775 | GNUNET_PQ_query_param_absolute_time (&expire), | 785 | GNUNET_PQ_query_param_absolute_time (&expire), |
776 | GNUNET_PQ_query_param_uint32 (&oid), | 786 | GNUNET_PQ_query_param_uint32 (&oid), |
777 | GNUNET_PQ_query_param_end | 787 | GNUNET_PQ_query_param_end |