aboutsummaryrefslogtreecommitdiff
path: root/src/psycstore
diff options
context:
space:
mode:
authorJeffrey Burdges <burdges@gnunet.org>2017-06-05 01:15:15 +0200
committerJeffrey Burdges <burdges@gnunet.org>2017-06-05 01:21:16 +0200
commitf78a3083e797a39f80934c6eeb52065b51dda72e (patch)
tree68df358992bafc575e209b2940452d6d46daa1f6 /src/psycstore
parent97baa2271fab56da227f345fd8ba306d67f2c437 (diff)
downloadgnunet-f78a3083e797a39f80934c6eeb52065b51dda72e.tar.gz
gnunet-f78a3083e797a39f80934c6eeb52065b51dda72e.zip
Switch fragment_row to PQ callback form
Christian or Gabor: Please check the comments containing ?? Also, I'm using stack based closures here because the scheduler cannot be invoked. In general, this sort of thing works well in Rust, but looks dangerous in C.
Diffstat (limited to 'src/psycstore')
-rw-r--r--src/psycstore/plugin_psycstore_postgres.c198
1 files changed, 104 insertions, 94 deletions
diff --git a/src/psycstore/plugin_psycstore_postgres.c b/src/psycstore/plugin_psycstore_postgres.c
index f23b75a0c..49dd6ca8e 100644
--- a/src/psycstore/plugin_psycstore_postgres.c
+++ b/src/psycstore/plugin_psycstore_postgres.c
@@ -25,6 +25,7 @@
25 * @author Gabor X Toth 25 * @author Gabor X Toth
26 * @author Christian Grothoff 26 * @author Christian Grothoff
27 * @author Christophe Genevey 27 * @author Christophe Genevey
28 * @author Jeffrey Burdges
28 */ 29 */
29 30
30#include "platform.h" 31#include "platform.h"
@@ -672,59 +673,72 @@ message_add_flags (void *cls,
672} 673}
673 674
674 675
675static int 676/**
676fragment_row (struct Plugin *plugin, 677 * Closure for #fragment_rows.
677 const char *stmt, 678 */
678 PGresult *res, 679struct FragmentRowsContext {
679 GNUNET_PSYCSTORE_FragmentCallback cb, 680 GNUNET_PSYCSTORE_FragmentCallback cb;
680 void *cb_cls, 681 void *cb_cls;
681 uint64_t *returned_fragments) 682
683 uint64_t *returned_fragments;
684
685 /* I preserved this but I do not see the point since
686 * it cannot stop the loop early and gets overwritten ?? */
687 int ret;
688};
689
690
691/**
692 * Callback that retrieves the results of a SELECT statement
693 * reading form the messages table.
694 *
695 * Only passed to GNUNET_PQ_eval_prepared_multi_select and
696 * has type GNUNET_PQ_PostgresResultHandler.
697 *
698 * @param cls closure
699 * @param result the postgres result
700 * @param num_result the number of results in @a result
701 */
702void fragment_rows (void *cls,
703 PGresult *res,
704 unsigned int num_results)
682{ 705{
683 uint32_t hop_counter; 706 struct FragmentRowsContext *c = cls;
684 void *signature = NULL;
685 void *purpose = NULL;
686 size_t signature_size;
687 size_t purpose_size;
688 uint64_t fragment_id;
689 uint64_t fragment_offset;
690 uint64_t message_id;
691 uint64_t group_generation;
692 uint32_t flags;
693 void *buf;
694 size_t buf_size;
695 int ret = GNUNET_SYSERR;
696 struct GNUNET_MULTICAST_MessageHeader *mp;
697 uint32_t msg_flags;
698 struct GNUNET_PQ_ResultSpec results[] = {
699 GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
700 GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size),
701 GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size),
702 GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id),
703 GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
704 GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
705 GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
706 GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
707 GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
708 GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
709 GNUNET_PQ_result_spec_end
710 };
711 707
712 if (GNUNET_OK != 708 for (unsigned int i=0;i<num_results;i++)
713 GNUNET_POSTGRES_check_result (plugin->dbh, res, PGRES_TUPLES_OK,
714 "PQexecPrepared",
715 stmt))
716 { 709 {
717 LOG (GNUNET_ERROR_TYPE_DEBUG, 710 uint32_t hop_counter;
718 "Failing fragment lookup (postgres error)\n"); 711 void *signature = NULL;
719 return GNUNET_SYSERR; 712 void *purpose = NULL;
720 } 713 size_t signature_size;
714 size_t purpose_size;
715 uint64_t fragment_id;
716 uint64_t fragment_offset;
717 uint64_t message_id;
718 uint64_t group_generation;
719 uint32_t flags;
720 void *buf;
721 size_t buf_size;
722 uint32_t msg_flags;
723 struct GNUNET_PQ_ResultSpec results[] = {
724 GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter),
725 GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size),
726 GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size),
727 GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id),
728 GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset),
729 GNUNET_PQ_result_spec_uint64 ("message_id", &message_id),
730 GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation),
731 GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags),
732 GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags),
733 GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size),
734 GNUNET_PQ_result_spec_end
735 };
736 struct GNUNET_MULTICAST_MessageHeader *mp;
721 737
722 int nrows = PQntuples (res); 738 if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i))
723 for (int row = 0; row < nrows; row++)
724 {
725 if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row))
726 { 739 {
727 break; 740 GNUNET_PQ_cleanup_result(results); /* missing previously, a memory leak?? */
741 break; /* nothing more?? */
728 } 742 }
729 743
730 mp = GNUNET_malloc (sizeof (*mp) + buf_size); 744 mp = GNUNET_malloc (sizeof (*mp) + buf_size);
@@ -733,11 +747,9 @@ fragment_row (struct Plugin *plugin,
733 mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); 747 mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
734 mp->hop_counter = htonl (hop_counter); 748 mp->hop_counter = htonl (hop_counter);
735 GNUNET_memcpy (&mp->signature, 749 GNUNET_memcpy (&mp->signature,
736 signature, 750 signature, signature_size);
737 signature_size);
738 GNUNET_memcpy (&mp->purpose, 751 GNUNET_memcpy (&mp->purpose,
739 purpose, 752 purpose, purpose_size);
740 purpose_size);
741 mp->fragment_id = GNUNET_htonll (fragment_id); 753 mp->fragment_id = GNUNET_htonll (fragment_id);
742 mp->fragment_offset = GNUNET_htonll (fragment_offset); 754 mp->fragment_offset = GNUNET_htonll (fragment_offset);
743 mp->message_id = GNUNET_htonll (message_id); 755 mp->message_id = GNUNET_htonll (message_id);
@@ -745,15 +757,12 @@ fragment_row (struct Plugin *plugin,
745 mp->flags = htonl(msg_flags); 757 mp->flags = htonl(msg_flags);
746 758
747 GNUNET_memcpy (&mp[1], 759 GNUNET_memcpy (&mp[1],
748 buf, 760 buf, buf_size);
749 buf_size);
750 GNUNET_PQ_cleanup_result(results); 761 GNUNET_PQ_cleanup_result(results);
751 ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags); 762 c->ret = c->cb (c->cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
752 if (NULL != returned_fragments) 763 if (NULL != c->returned_fragments)
753 (*returned_fragments)++; 764 (*c->returned_fragments)++;
754 } 765 }
755
756 return ret;
757} 766}
758 767
759 768
@@ -765,26 +774,19 @@ fragment_select (struct Plugin *plugin,
765 GNUNET_PSYCSTORE_FragmentCallback cb, 774 GNUNET_PSYCSTORE_FragmentCallback cb,
766 void *cb_cls) 775 void *cb_cls)
767{ 776{
768 PGresult *res; 777 /* Stack based closure */
769 int ret = GNUNET_SYSERR; 778 struct FragmentRowsContext frc = {
770 779 .cb = cb,
771 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); 780 .cb_cls = cb_cls,
772 if (GNUNET_YES == 781 .returned_fragments = returned_fragments,
773 GNUNET_POSTGRES_check_result (plugin->dbh, 782 .ret = GNUNET_SYSERR
774 res, 783 };
775 PGRES_TUPLES_OK,
776 "PQexecPrepared", stmt))
777 {
778 if (PQntuples (res) == 0)
779 ret = GNUNET_NO;
780 else
781 {
782 ret = fragment_row (plugin, stmt, res, cb, cb_cls, returned_fragments);
783 }
784 PQclear (res);
785 }
786 784
787 return ret; 785 if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
786 stmt, params,
787 &fragment_rows, &frc))
788 return GNUNET_SYSERR;
789 return frc.ret; /* GNUNET_OK ?? */
788} 790}
789 791
790/** 792/**
@@ -938,9 +940,7 @@ message_get_fragment (void *cls,
938 GNUNET_PSYCSTORE_FragmentCallback cb, 940 GNUNET_PSYCSTORE_FragmentCallback cb,
939 void *cb_cls) 941 void *cb_cls)
940{ 942{
941 PGresult *res;
942 struct Plugin *plugin = cls; 943 struct Plugin *plugin = cls;
943 int ret = GNUNET_SYSERR;
944 const char *stmt = "select_message_fragment"; 944 const char *stmt = "select_message_fragment";
945 945
946 struct GNUNET_PQ_QueryParam params_select[] = { 946 struct GNUNET_PQ_QueryParam params_select[] = {
@@ -950,21 +950,19 @@ message_get_fragment (void *cls,
950 GNUNET_PQ_query_param_end 950 GNUNET_PQ_query_param_end
951 }; 951 };
952 952
953 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); 953 /* Stack based closure */
954 if (GNUNET_OK == GNUNET_POSTGRES_check_result (plugin->dbh, 954 struct FragmentRowsContext frc = {
955 res, 955 .cb = cb,
956 PGRES_TUPLES_OK, 956 .cb_cls = cb_cls,
957 "PQexecPrepared", stmt)) 957 .returned_fragments = NULL,
958 { 958 .ret = GNUNET_SYSERR
959 if (PQntuples (res) == 0) 959 };
960 ret = GNUNET_NO;
961 else
962 ret = fragment_row (plugin, stmt, res, cb, cb_cls, NULL);
963
964 PQclear (res);
965 }
966 960
967 return ret; 961 if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh,
962 stmt, params_select,
963 &fragment_rows, &frc))
964 return GNUNET_SYSERR;
965 return frc.ret; /* GNUNET_OK ?? */
968} 966}
969 967
970/** 968/**
@@ -1356,6 +1354,11 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
1356 GNUNET_PQ_result_spec_end 1354 GNUNET_PQ_result_spec_end
1357 }; 1355 };
1358 1356
1357/*
1358+ enum GNUNET_PQ_QueryStatus res;
1359+ struct ExtractResultContext erc;
1360*/
1361
1359 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); 1362 res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select);
1360 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, 1363 if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh,
1361 res, 1364 res,
@@ -1382,6 +1385,13 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
1382 PQclear (res); 1385 PQclear (res);
1383 1386
1384 return ret; 1387 return ret;
1388
1389/*
1390 erc.iter = iter;
1391 erc.iter_cls = iter_cls;
1392 res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, stmt, params_select,
1393+ &extract_result_cb, &erc);
1394*/
1385} 1395}
1386 1396
1387 1397