diff options
author | Jeffrey Burdges <burdges@gnunet.org> | 2017-06-05 01:15:15 +0200 |
---|---|---|
committer | Jeffrey Burdges <burdges@gnunet.org> | 2017-06-05 01:21:16 +0200 |
commit | f78a3083e797a39f80934c6eeb52065b51dda72e (patch) | |
tree | 68df358992bafc575e209b2940452d6d46daa1f6 /src/psycstore | |
parent | 97baa2271fab56da227f345fd8ba306d67f2c437 (diff) | |
download | gnunet-f78a3083e797a39f80934c6eeb52065b51dda72e.tar.gz gnunet-f78a3083e797a39f80934c6eeb52065b51dda72e.zip |
Switch fragment_row to PQ callback form
Christian or Gabor: Please check the comments containing ??
Also, I'm using stack based closures here because the scheduler
cannot be invoked. In general, this sort of thing works well
in Rust, but looks dangerous in C.
Diffstat (limited to 'src/psycstore')
-rw-r--r-- | src/psycstore/plugin_psycstore_postgres.c | 198 |
1 files changed, 104 insertions, 94 deletions
diff --git a/src/psycstore/plugin_psycstore_postgres.c b/src/psycstore/plugin_psycstore_postgres.c index f23b75a0c..49dd6ca8e 100644 --- a/src/psycstore/plugin_psycstore_postgres.c +++ b/src/psycstore/plugin_psycstore_postgres.c | |||
@@ -25,6 +25,7 @@ | |||
25 | * @author Gabor X Toth | 25 | * @author Gabor X Toth |
26 | * @author Christian Grothoff | 26 | * @author Christian Grothoff |
27 | * @author Christophe Genevey | 27 | * @author Christophe Genevey |
28 | * @author Jeffrey Burdges | ||
28 | */ | 29 | */ |
29 | 30 | ||
30 | #include "platform.h" | 31 | #include "platform.h" |
@@ -672,59 +673,72 @@ message_add_flags (void *cls, | |||
672 | } | 673 | } |
673 | 674 | ||
674 | 675 | ||
675 | static int | 676 | /** |
676 | fragment_row (struct Plugin *plugin, | 677 | * Closure for #fragment_rows. |
677 | const char *stmt, | 678 | */ |
678 | PGresult *res, | 679 | struct FragmentRowsContext { |
679 | GNUNET_PSYCSTORE_FragmentCallback cb, | 680 | GNUNET_PSYCSTORE_FragmentCallback cb; |
680 | void *cb_cls, | 681 | void *cb_cls; |
681 | uint64_t *returned_fragments) | 682 | |
683 | uint64_t *returned_fragments; | ||
684 | |||
685 | /* I preserved this but I do not see the point since | ||
686 | * it cannot stop the loop early and gets overwritten ?? */ | ||
687 | int ret; | ||
688 | }; | ||
689 | |||
690 | |||
691 | /** | ||
692 | * Callback that retrieves the results of a SELECT statement | ||
693 | * reading form the messages table. | ||
694 | * | ||
695 | * Only passed to GNUNET_PQ_eval_prepared_multi_select and | ||
696 | * has type GNUNET_PQ_PostgresResultHandler. | ||
697 | * | ||
698 | * @param cls closure | ||
699 | * @param result the postgres result | ||
700 | * @param num_result the number of results in @a result | ||
701 | */ | ||
702 | void fragment_rows (void *cls, | ||
703 | PGresult *res, | ||
704 | unsigned int num_results) | ||
682 | { | 705 | { |
683 | uint32_t hop_counter; | 706 | struct FragmentRowsContext *c = cls; |
684 | void *signature = NULL; | ||
685 | void *purpose = NULL; | ||
686 | size_t signature_size; | ||
687 | size_t purpose_size; | ||
688 | uint64_t fragment_id; | ||
689 | uint64_t fragment_offset; | ||
690 | uint64_t message_id; | ||
691 | uint64_t group_generation; | ||
692 | uint32_t flags; | ||
693 | void *buf; | ||
694 | size_t buf_size; | ||
695 | int ret = GNUNET_SYSERR; | ||
696 | struct GNUNET_MULTICAST_MessageHeader *mp; | ||
697 | uint32_t msg_flags; | ||
698 | struct GNUNET_PQ_ResultSpec results[] = { | ||
699 | GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter), | ||
700 | GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size), | ||
701 | GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size), | ||
702 | GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id), | ||
703 | GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset), | ||
704 | GNUNET_PQ_result_spec_uint64 ("message_id", &message_id), | ||
705 | GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation), | ||
706 | GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags), | ||
707 | GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags), | ||
708 | GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size), | ||
709 | GNUNET_PQ_result_spec_end | ||
710 | }; | ||
711 | 707 | ||
712 | if (GNUNET_OK != | 708 | for (unsigned int i=0;i<num_results;i++) |
713 | GNUNET_POSTGRES_check_result (plugin->dbh, res, PGRES_TUPLES_OK, | ||
714 | "PQexecPrepared", | ||
715 | stmt)) | ||
716 | { | 709 | { |
717 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 710 | uint32_t hop_counter; |
718 | "Failing fragment lookup (postgres error)\n"); | 711 | void *signature = NULL; |
719 | return GNUNET_SYSERR; | 712 | void *purpose = NULL; |
720 | } | 713 | size_t signature_size; |
714 | size_t purpose_size; | ||
715 | uint64_t fragment_id; | ||
716 | uint64_t fragment_offset; | ||
717 | uint64_t message_id; | ||
718 | uint64_t group_generation; | ||
719 | uint32_t flags; | ||
720 | void *buf; | ||
721 | size_t buf_size; | ||
722 | uint32_t msg_flags; | ||
723 | struct GNUNET_PQ_ResultSpec results[] = { | ||
724 | GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter), | ||
725 | GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size), | ||
726 | GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size), | ||
727 | GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id), | ||
728 | GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset), | ||
729 | GNUNET_PQ_result_spec_uint64 ("message_id", &message_id), | ||
730 | GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation), | ||
731 | GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags), | ||
732 | GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags), | ||
733 | GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size), | ||
734 | GNUNET_PQ_result_spec_end | ||
735 | }; | ||
736 | struct GNUNET_MULTICAST_MessageHeader *mp; | ||
721 | 737 | ||
722 | int nrows = PQntuples (res); | 738 | if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i)) |
723 | for (int row = 0; row < nrows; row++) | ||
724 | { | ||
725 | if (GNUNET_OK != GNUNET_PQ_extract_result (res, results, row)) | ||
726 | { | 739 | { |
727 | break; | 740 | GNUNET_PQ_cleanup_result(results); /* missing previously, a memory leak?? */ |
741 | break; /* nothing more?? */ | ||
728 | } | 742 | } |
729 | 743 | ||
730 | mp = GNUNET_malloc (sizeof (*mp) + buf_size); | 744 | mp = GNUNET_malloc (sizeof (*mp) + buf_size); |
@@ -733,11 +747,9 @@ fragment_row (struct Plugin *plugin, | |||
733 | mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); | 747 | mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); |
734 | mp->hop_counter = htonl (hop_counter); | 748 | mp->hop_counter = htonl (hop_counter); |
735 | GNUNET_memcpy (&mp->signature, | 749 | GNUNET_memcpy (&mp->signature, |
736 | signature, | 750 | signature, signature_size); |
737 | signature_size); | ||
738 | GNUNET_memcpy (&mp->purpose, | 751 | GNUNET_memcpy (&mp->purpose, |
739 | purpose, | 752 | purpose, purpose_size); |
740 | purpose_size); | ||
741 | mp->fragment_id = GNUNET_htonll (fragment_id); | 753 | mp->fragment_id = GNUNET_htonll (fragment_id); |
742 | mp->fragment_offset = GNUNET_htonll (fragment_offset); | 754 | mp->fragment_offset = GNUNET_htonll (fragment_offset); |
743 | mp->message_id = GNUNET_htonll (message_id); | 755 | mp->message_id = GNUNET_htonll (message_id); |
@@ -745,15 +757,12 @@ fragment_row (struct Plugin *plugin, | |||
745 | mp->flags = htonl(msg_flags); | 757 | mp->flags = htonl(msg_flags); |
746 | 758 | ||
747 | GNUNET_memcpy (&mp[1], | 759 | GNUNET_memcpy (&mp[1], |
748 | buf, | 760 | buf, buf_size); |
749 | buf_size); | ||
750 | GNUNET_PQ_cleanup_result(results); | 761 | GNUNET_PQ_cleanup_result(results); |
751 | ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags); | 762 | c->ret = c->cb (c->cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags); |
752 | if (NULL != returned_fragments) | 763 | if (NULL != c->returned_fragments) |
753 | (*returned_fragments)++; | 764 | (*c->returned_fragments)++; |
754 | } | 765 | } |
755 | |||
756 | return ret; | ||
757 | } | 766 | } |
758 | 767 | ||
759 | 768 | ||
@@ -765,26 +774,19 @@ fragment_select (struct Plugin *plugin, | |||
765 | GNUNET_PSYCSTORE_FragmentCallback cb, | 774 | GNUNET_PSYCSTORE_FragmentCallback cb, |
766 | void *cb_cls) | 775 | void *cb_cls) |
767 | { | 776 | { |
768 | PGresult *res; | 777 | /* Stack based closure */ |
769 | int ret = GNUNET_SYSERR; | 778 | struct FragmentRowsContext frc = { |
770 | 779 | .cb = cb, | |
771 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params); | 780 | .cb_cls = cb_cls, |
772 | if (GNUNET_YES == | 781 | .returned_fragments = returned_fragments, |
773 | GNUNET_POSTGRES_check_result (plugin->dbh, | 782 | .ret = GNUNET_SYSERR |
774 | res, | 783 | }; |
775 | PGRES_TUPLES_OK, | ||
776 | "PQexecPrepared", stmt)) | ||
777 | { | ||
778 | if (PQntuples (res) == 0) | ||
779 | ret = GNUNET_NO; | ||
780 | else | ||
781 | { | ||
782 | ret = fragment_row (plugin, stmt, res, cb, cb_cls, returned_fragments); | ||
783 | } | ||
784 | PQclear (res); | ||
785 | } | ||
786 | 784 | ||
787 | return ret; | 785 | if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, |
786 | stmt, params, | ||
787 | &fragment_rows, &frc)) | ||
788 | return GNUNET_SYSERR; | ||
789 | return frc.ret; /* GNUNET_OK ?? */ | ||
788 | } | 790 | } |
789 | 791 | ||
790 | /** | 792 | /** |
@@ -938,9 +940,7 @@ message_get_fragment (void *cls, | |||
938 | GNUNET_PSYCSTORE_FragmentCallback cb, | 940 | GNUNET_PSYCSTORE_FragmentCallback cb, |
939 | void *cb_cls) | 941 | void *cb_cls) |
940 | { | 942 | { |
941 | PGresult *res; | ||
942 | struct Plugin *plugin = cls; | 943 | struct Plugin *plugin = cls; |
943 | int ret = GNUNET_SYSERR; | ||
944 | const char *stmt = "select_message_fragment"; | 944 | const char *stmt = "select_message_fragment"; |
945 | 945 | ||
946 | struct GNUNET_PQ_QueryParam params_select[] = { | 946 | struct GNUNET_PQ_QueryParam params_select[] = { |
@@ -950,21 +950,19 @@ message_get_fragment (void *cls, | |||
950 | GNUNET_PQ_query_param_end | 950 | GNUNET_PQ_query_param_end |
951 | }; | 951 | }; |
952 | 952 | ||
953 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | 953 | /* Stack based closure */ |
954 | if (GNUNET_OK == GNUNET_POSTGRES_check_result (plugin->dbh, | 954 | struct FragmentRowsContext frc = { |
955 | res, | 955 | .cb = cb, |
956 | PGRES_TUPLES_OK, | 956 | .cb_cls = cb_cls, |
957 | "PQexecPrepared", stmt)) | 957 | .returned_fragments = NULL, |
958 | { | 958 | .ret = GNUNET_SYSERR |
959 | if (PQntuples (res) == 0) | 959 | }; |
960 | ret = GNUNET_NO; | ||
961 | else | ||
962 | ret = fragment_row (plugin, stmt, res, cb, cb_cls, NULL); | ||
963 | |||
964 | PQclear (res); | ||
965 | } | ||
966 | 960 | ||
967 | return ret; | 961 | if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, |
962 | stmt, params_select, | ||
963 | &fragment_rows, &frc)) | ||
964 | return GNUNET_SYSERR; | ||
965 | return frc.ret; /* GNUNET_OK ?? */ | ||
968 | } | 966 | } |
969 | 967 | ||
970 | /** | 968 | /** |
@@ -1356,6 +1354,11 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_ | |||
1356 | GNUNET_PQ_result_spec_end | 1354 | GNUNET_PQ_result_spec_end |
1357 | }; | 1355 | }; |
1358 | 1356 | ||
1357 | /* | ||
1358 | + enum GNUNET_PQ_QueryStatus res; | ||
1359 | + struct ExtractResultContext erc; | ||
1360 | */ | ||
1361 | |||
1359 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); | 1362 | res = GNUNET_PQ_exec_prepared (plugin->dbh, stmt, params_select); |
1360 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, | 1363 | if (GNUNET_OK != GNUNET_POSTGRES_check_result (plugin->dbh, |
1361 | res, | 1364 | res, |
@@ -1382,6 +1385,13 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_ | |||
1382 | PQclear (res); | 1385 | PQclear (res); |
1383 | 1386 | ||
1384 | return ret; | 1387 | return ret; |
1388 | |||
1389 | /* | ||
1390 | erc.iter = iter; | ||
1391 | erc.iter_cls = iter_cls; | ||
1392 | res = GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, stmt, params_select, | ||
1393 | + &extract_result_cb, &erc); | ||
1394 | */ | ||
1385 | } | 1395 | } |
1386 | 1396 | ||
1387 | 1397 | ||