merchant

Merchant backend to process payments, run by merchants
Log | Files | Refs | Submodules | README | LICENSE

taler-merchant-webhook.c (16963B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2023 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 /**
     17  * @file src/backend/taler-merchant-webhook.c
     18  * @brief Process that runs webhooks triggered by the merchant backend
     19  * @author Priscilla HUANG
     20  */
     21 #include "platform.h"
     22 #include "microhttpd.h"
     23 #include <gnunet/gnunet_util_lib.h>
     24 #include <jansson.h>
     25 #include <pthread.h>
     26 #include "taler/taler_merchant_util.h"
     27 #include "merchantdb_lib.h"
     28 #include "merchantdb_lib.h"
     29 #include <taler/taler_dbevents.h>
     30 #include "merchant-database/delete_pending_webhook.h"
     31 #include "merchant-database/lookup_pending_webhooks.h"
     32 #include "merchant-database/update_pending_webhook.h"
     33 #include "merchant-database/event_listen.h"
     34 #include "merchant-database/preflight.h"
     35 
     36 
     37 struct WorkResponse
     38 {
     39   struct WorkResponse *next;
     40   struct WorkResponse *prev;
     41   struct GNUNET_CURL_Job *job;
     42   uint64_t webhook_pending_serial;
     43   char *body;
     44   struct curl_slist *job_headers;
     45 };
     46 
     47 
     48 static struct WorkResponse *w_head;
     49 
     50 static struct WorkResponse *w_tail;
     51 
     52 static struct GNUNET_DB_EventHandler *event_handler;
     53 
     54 /**
     55  * The merchant's configuration.
     56  */
     57 static const struct GNUNET_CONFIGURATION_Handle *cfg;
     58 
     59 /**
     60  * Our database connection.
     61  */
     62 static struct TALER_MERCHANTDB_PostgresContext *pg;
     63 
     64 /**
     65  * Next task to run, if any.
     66  */
     67 static struct GNUNET_SCHEDULER_Task *task;
     68 
     69 /**
     70  * Handle to the context for interacting with the bank / wire gateway.
     71  */
     72 static struct GNUNET_CURL_Context *ctx;
     73 
     74 /**
     75  * Scheduler context for running the @e ctx.
     76  */
     77 static struct GNUNET_CURL_RescheduleContext *rc;
     78 
     79 /**
     80  * Value to return from main(). 0 on success, non-zero on errors.
     81  */
     82 static int global_ret;
     83 
     84 /**
     85  * #GNUNET_YES if we are in test mode and should exit when idle.
     86  */
     87 static int test_mode;
     88 
     89 
     90 /**
     91  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
     92  *
     93  * @param cls closure
     94  */
     95 static void
     96 shutdown_task (void *cls)
     97 {
     98   struct WorkResponse *w;
     99 
    100   (void) cls;
    101   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    102               "Running shutdown\n");
    103   if (NULL != event_handler)
    104   {
    105     TALER_MERCHANTDB_event_listen_cancel (event_handler);
    106     event_handler = NULL;
    107   }
    108   if (NULL != task)
    109   {
    110     GNUNET_SCHEDULER_cancel (task);
    111     task = NULL;
    112   }
    113   while (NULL != (w = w_head))
    114   {
    115     GNUNET_CONTAINER_DLL_remove (w_head,
    116                                  w_tail,
    117                                  w);
    118     GNUNET_CURL_job_cancel (w->job);
    119     curl_slist_free_all (w->job_headers);
    120     GNUNET_free (w->body);
    121     GNUNET_free (w);
    122   }
    123   if (NULL != pg)
    124   {
    125     TALER_MERCHANTDB_disconnect (pg);
    126     pg = NULL;
    127   }
    128   cfg = NULL;
    129   if (NULL != ctx)
    130   {
    131     GNUNET_CURL_fini (ctx);
    132     ctx = NULL;
    133   }
    134   if (NULL != rc)
    135   {
    136     GNUNET_CURL_gnunet_rc_destroy (rc);
    137     rc = NULL;
    138   }
    139 }
    140 
    141 
    142 /**
    143  * Select webhook to process.
    144  *
    145  * @param cls NULL
    146  */
    147 static void
    148 select_work (void *cls);
    149 
    150 
    151 /**
    152  * This function is used by the function `pending_webhooks_cb`. According to the response code,
    153  * we delete or update the webhook.
    154  *
    155  * @param cls closure
    156  * @param response_code HTTP response code from server, 0 on hard error
    157  * @param body http body of the response
    158  * @param body_size number of bytes in @a body
    159  */
    160 static void
    161 handle_webhook_response (void *cls,
    162                          long response_code,
    163                          const void *body,
    164                          size_t body_size)
    165 {
    166   struct WorkResponse *w = cls;
    167 
    168   (void) body;
    169   (void) body_size;
    170   w->job = NULL;
    171   GNUNET_CONTAINER_DLL_remove (w_head,
    172                                w_tail,
    173                                w);
    174   GNUNET_free (w->body);
    175   curl_slist_free_all (w->job_headers);
    176   if (NULL == w_head)
    177     task = GNUNET_SCHEDULER_add_now (&select_work,
    178                                      NULL);
    179   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    180               "Webhook %llu returned with status %ld\n",
    181               (unsigned long long) w->webhook_pending_serial,
    182               response_code);
    183   if (2 == response_code / 100) /* any 2xx http status code is OK! */
    184   {
    185     enum GNUNET_DB_QueryStatus qs;
    186 
    187     qs = TALER_MERCHANTDB_delete_pending_webhook (pg,
    188                                                   w->webhook_pending_serial);
    189     GNUNET_free (w);
    190     switch (qs)
    191     {
    192     case GNUNET_DB_STATUS_HARD_ERROR:
    193     case GNUNET_DB_STATUS_SOFT_ERROR:
    194       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    195                   "Failed to delete webhook, delete returned: %d\n",
    196                   qs);
    197       global_ret = EXIT_FAILURE;
    198       GNUNET_SCHEDULER_shutdown ();
    199       return;
    200     case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    201       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    202                   "Delete returned: %d\n",
    203                   qs);
    204       return;
    205     case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    206       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    207                   "Delete returned: %d\n",
    208                   qs);
    209       return;
    210     }
    211     GNUNET_assert (0);
    212   }
    213 
    214   {
    215     struct GNUNET_TIME_Relative next_attempt;
    216     enum GNUNET_DB_QueryStatus qs;
    217     switch (response_code)
    218     {
    219     case MHD_HTTP_BAD_REQUEST:
    220       next_attempt = GNUNET_TIME_UNIT_FOREVER_REL;   // never try again
    221       break;
    222     case MHD_HTTP_INTERNAL_SERVER_ERROR:
    223       next_attempt = GNUNET_TIME_UNIT_MINUTES;
    224       break;
    225     case MHD_HTTP_FORBIDDEN:
    226       next_attempt = GNUNET_TIME_UNIT_MINUTES;
    227       break;
    228     default:
    229       next_attempt = GNUNET_TIME_UNIT_HOURS;
    230       break;
    231     }
    232     qs = TALER_MERCHANTDB_update_pending_webhook (pg,
    233                                                   w->webhook_pending_serial,
    234                                                   GNUNET_TIME_relative_to_absolute (
    235                                                     next_attempt));
    236     GNUNET_free (w);
    237     switch (qs)
    238     {
    239     case GNUNET_DB_STATUS_HARD_ERROR:
    240     case GNUNET_DB_STATUS_SOFT_ERROR:
    241       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    242                   "Failed to update pending webhook to next in %s Rval: %d\n",
    243                   GNUNET_TIME_relative2s (next_attempt,
    244                                           true),
    245                   qs);
    246       global_ret = EXIT_FAILURE;
    247       GNUNET_SCHEDULER_shutdown ();
    248       return;
    249     case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    250       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    251                   "Next in %s Rval: %d\n",
    252                   GNUNET_TIME_relative2s (next_attempt, true),
    253                   qs);
    254       return;
    255     case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    256       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    257                   "Next in %s Rval: %d\n",
    258                   GNUNET_TIME_relative2s (next_attempt, true),
    259                   qs);
    260       return;
    261     }
    262     GNUNET_assert (0);
    263   }
    264 }
    265 
    266 
    267 /**
    268  * Typically called by `select_work`.
    269  *
    270  * @param cls a `json_t *` JSON array to build
    271  * @param webhook_pending_serial reference to the configured webhook template.
    272  * @param next_attempt is the time we should make the next request to the webhook.
    273  * @param retries how often have we tried this request to the webhook.
    274  * @param url to make request to
    275  * @param http_method use for the webhook
    276  * @param header of the webhook
    277  * @param body of the webhook
    278  */
    279 static void
    280 pending_webhooks_cb (void *cls,
    281                      uint64_t webhook_pending_serial,
    282                      struct GNUNET_TIME_Absolute next_attempt,
    283                      uint32_t retries,
    284                      const char *url,
    285                      const char *http_method,
    286                      const char *header,
    287                      const char *body)
    288 {
    289   struct WorkResponse *w = GNUNET_new (struct WorkResponse);
    290   CURL *eh;
    291   struct curl_slist *job_headers = NULL;
    292 
    293   (void) retries;
    294   (void) next_attempt;
    295   (void) cls;
    296   GNUNET_CONTAINER_DLL_insert (w_head,
    297                                w_tail,
    298                                w);
    299   w->webhook_pending_serial = webhook_pending_serial;
    300   eh = curl_easy_init ();
    301   GNUNET_assert (NULL != eh);
    302   GNUNET_assert (CURLE_OK ==
    303                  curl_easy_setopt (eh,
    304                                    CURLOPT_CUSTOMREQUEST,
    305                                    http_method));
    306   GNUNET_assert (CURLE_OK ==
    307                  curl_easy_setopt (eh,
    308                                    CURLOPT_URL,
    309                                    url));
    310   GNUNET_assert (CURLE_OK ==
    311                  curl_easy_setopt (eh,
    312                                    CURLOPT_VERBOSE,
    313                                    0L));
    314 
    315   /* conversion body data */
    316   if (NULL != body)
    317   {
    318     w->body = GNUNET_strdup (body);
    319     GNUNET_assert (CURLE_OK ==
    320                    curl_easy_setopt (eh,
    321                                      CURLOPT_POSTFIELDS,
    322                                      w->body));
    323   }
    324   /* conversion header to job_headers data */
    325   if (NULL != header)
    326   {
    327     char *header_copy = GNUNET_strdup (header);
    328 
    329     for (const char *tok = strtok (header_copy, "\n");
    330          NULL != tok;
    331          tok = strtok (NULL, "\n"))
    332     {
    333       // extract all Key: value from 'header_copy'!
    334       job_headers = curl_slist_append (job_headers,
    335                                        tok);
    336     }
    337     GNUNET_free (header_copy);
    338     GNUNET_assert (CURLE_OK ==
    339                    curl_easy_setopt (eh,
    340                                      CURLOPT_HTTPHEADER,
    341                                      job_headers));
    342     w->job_headers = job_headers;
    343   }
    344   GNUNET_assert (CURLE_OK ==
    345                  curl_easy_setopt (eh,
    346                                    CURLOPT_MAXREDIRS,
    347                                    5));
    348   GNUNET_assert (CURLE_OK ==
    349                  curl_easy_setopt (eh,
    350                                    CURLOPT_FOLLOWLOCATION,
    351                                    1));
    352 
    353   w->job = GNUNET_CURL_job_add_raw (ctx,
    354                                     eh,
    355                                     job_headers,
    356                                     &handle_webhook_response,
    357                                     w);
    358   if (NULL == w->job)
    359   {
    360     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    361                 "Failed to start the curl job for pending webhook #%llu\n",
    362                 (unsigned long long) webhook_pending_serial);
    363     curl_slist_free_all (w->job_headers);
    364     GNUNET_free (w->body);
    365     GNUNET_CONTAINER_DLL_remove (w_head,
    366                                  w_tail,
    367                                  w);
    368     GNUNET_free (w);
    369     GNUNET_SCHEDULER_shutdown ();
    370     return;
    371   }
    372 }
    373 
    374 
    375 /**
    376  * Function called on events received from Postgres.
    377  *
    378  * @param cls closure, NULL
    379  * @param extra additional event data provided
    380  * @param extra_size number of bytes in @a extra
    381  */
    382 static void
    383 db_notify (void *cls,
    384            const void *extra,
    385            size_t extra_size)
    386 {
    387   (void) cls;
    388   (void) extra;
    389   (void) extra_size;
    390 
    391   GNUNET_assert (NULL != task);
    392   GNUNET_SCHEDULER_cancel (task);
    393   task = GNUNET_SCHEDULER_add_now (&select_work,
    394                                    NULL);
    395 }
    396 
    397 
    398 /**
    399  * Typically called by `select_work`.
    400  *
    401  * @param cls a `json_t *` JSON array to build
    402  * @param webhook_pending_serial reference to the configured webhook template.
    403  * @param next_attempt is the time we should make the next request to the webhook.
    404  * @param retries how often have we tried this request to the webhook.
    405  * @param url to make request to
    406  * @param http_method use for the webhook
    407  * @param header of the webhook
    408  * @param body of the webhook
    409  */
    410 static void
    411 future_webhook_cb (void *cls,
    412                    uint64_t webhook_pending_serial,
    413                    struct GNUNET_TIME_Absolute next_attempt,
    414                    uint32_t retries,
    415                    const char *url,
    416                    const char *http_method,
    417                    const char *header,
    418                    const char *body)
    419 {
    420   (void) webhook_pending_serial;
    421   (void) retries;
    422   (void) url;
    423   (void) http_method;
    424   (void) header;
    425   (void) body;
    426 
    427   task = GNUNET_SCHEDULER_add_at (next_attempt,
    428                                   &select_work,
    429                                   NULL);
    430 }
    431 
    432 
    433 static void
    434 select_work (void *cls)
    435 {
    436   enum GNUNET_DB_QueryStatus qs;
    437   struct GNUNET_TIME_Relative rel;
    438 
    439   (void) cls;
    440   task = NULL;
    441   TALER_MERCHANTDB_preflight (pg);
    442   qs = TALER_MERCHANTDB_lookup_pending_webhooks (pg,
    443                                                  &pending_webhooks_cb,
    444                                                  NULL);
    445   switch (qs)
    446   {
    447   case GNUNET_DB_STATUS_HARD_ERROR:
    448   case GNUNET_DB_STATUS_SOFT_ERROR:
    449     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    450                 "Failed to lookup pending webhooks!\n");
    451     global_ret = EXIT_FAILURE;
    452     GNUNET_SCHEDULER_shutdown ();
    453     return;
    454   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    455     if (test_mode)
    456     {
    457       GNUNET_SCHEDULER_shutdown ();
    458       return;
    459     }
    460     qs = TALER_MERCHANTDB_lookup_future_webhook (pg,
    461                                                  &future_webhook_cb,
    462                                                  NULL);
    463     switch (qs)
    464     {
    465     case GNUNET_DB_STATUS_HARD_ERROR:
    466     case GNUNET_DB_STATUS_SOFT_ERROR:
    467       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    468                   "Failed to lookup future webhook!\n");
    469       global_ret = EXIT_FAILURE;
    470       GNUNET_SCHEDULER_shutdown ();
    471       return;
    472     case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    473       return;
    474     case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    475       /* wait 5 min */
    476       /* Note: this should not even be necessary if all webhooks
    477          use the events properly... */
    478       rel = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
    479       task = GNUNET_SCHEDULER_add_delayed (rel,
    480                                            &select_work,
    481                                            NULL);
    482       return;
    483     }
    484   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    485   default:
    486     return; // wait for completion, then select more work.
    487   }
    488 }
    489 
    490 
    491 /**
    492  * First task.
    493  *
    494  * @param cls closure, NULL
    495  * @param args remaining command-line arguments
    496  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
    497  * @param c configuration
    498  */
    499 static void
    500 run (void *cls,
    501      char *const *args,
    502      const char *cfgfile,
    503      const struct GNUNET_CONFIGURATION_Handle *c)
    504 {
    505   (void) args;
    506   (void) cfgfile;
    507 
    508   cfg = c;
    509   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
    510                                  NULL);
    511   ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
    512                           &rc);
    513   rc = GNUNET_CURL_gnunet_rc_create (ctx);
    514   if (NULL == ctx)
    515   {
    516     GNUNET_break (0);
    517     GNUNET_SCHEDULER_shutdown ();
    518     global_ret = EXIT_FAILURE;
    519     return;
    520   }
    521   if (NULL ==
    522       (pg = TALER_MERCHANTDB_connect (cfg)))
    523   {
    524     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    525                 "Failed to initialize DB subsystem. Consider running taler-merchant-dbconfig!\n");
    526     GNUNET_SCHEDULER_shutdown ();
    527     global_ret = EXIT_FAILURE;
    528     return;
    529   }
    530   {
    531     struct GNUNET_DB_EventHeaderP es = {
    532       .size = htons (sizeof (es)),
    533       .type = htons (TALER_DBEVENT_MERCHANT_WEBHOOK_PENDING)
    534     };
    535 
    536     event_handler = TALER_MERCHANTDB_event_listen (pg,
    537                                                    &es,
    538                                                    GNUNET_TIME_UNIT_FOREVER_REL,
    539                                                    &db_notify,
    540                                                    NULL);
    541   }
    542   GNUNET_assert (NULL == task);
    543   task = GNUNET_SCHEDULER_add_now (&select_work,
    544                                    NULL);
    545 }
    546 
    547 
    548 /**
    549  * The main function of the taler-merchant-webhook
    550  * @param argc number of arguments from the command line
    551  * @param argv command line arguments
    552  * @return 0 ok, 1 on error
    553  */
    554 int
    555 main (int argc,
    556       char *const *argv)
    557 {
    558   struct GNUNET_GETOPT_CommandLineOption options[] = {
    559     GNUNET_GETOPT_option_flag ('t',
    560                                "test",
    561                                "run in test mode and exit when idle",
    562                                &test_mode),
    563     GNUNET_GETOPT_option_timetravel ('T',
    564                                      "timetravel"),
    565     GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
    566     GNUNET_GETOPT_OPTION_END
    567   };
    568   enum GNUNET_GenericReturnValue ret;
    569 
    570   ret = GNUNET_PROGRAM_run (
    571     TALER_MERCHANT_project_data (),
    572     argc, argv,
    573     "taler-merchant-webhook",
    574     gettext_noop (
    575       "background process that executes webhooks"),
    576     options,
    577     &run, NULL);
    578   if (GNUNET_SYSERR == ret)
    579     return EXIT_INVALIDARGUMENT;
    580   if (GNUNET_NO == ret)
    581     return EXIT_SUCCESS;
    582   return global_ret;
    583 }
    584 
    585 
    586 /* end of taler-merchant-webhook.c */