diff options
author | Christian Grothoff <christian@grothoff.org> | 2010-04-04 22:02:08 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2010-04-04 22:02:08 +0000 |
commit | 4b78e6983509f2078c4f52888187436c0a7b6aeb (patch) | |
tree | 9035de10dd6e4e922f83e60a44347730ffbd9551 /src/datastore | |
parent | ecd7e5a64fca8678d585ba9f6d6b7e217c44ff33 (diff) | |
download | gnunet-4b78e6983509f2078c4f52888187436c0a7b6aeb.tar.gz gnunet-4b78e6983509f2078c4f52888187436c0a7b6aeb.zip |
make more robust to disconnect
Diffstat (limited to 'src/datastore')
-rw-r--r-- | src/datastore/datastore_api.c | 72 | ||||
-rw-r--r-- | src/datastore/gnunet-service-datastore.c | 64 |
2 files changed, 87 insertions, 49 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index cb70cc1f3..fb1939cd9 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c | |||
@@ -143,23 +143,26 @@ transmit_drop (void *cls, | |||
143 | void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, | 143 | void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, |
144 | int drop) | 144 | int drop) |
145 | { | 145 | { |
146 | GNUNET_assert (0 == h->message_size); | 146 | if (h->client != NULL) |
147 | GNUNET_assert (NULL == h->response_proc); | 147 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); |
148 | if ( (GNUNET_YES == drop) && | 148 | h->client = NULL; |
149 | (h->client != NULL) ) | 149 | if (GNUNET_YES == drop) |
150 | { | 150 | { |
151 | if (NULL != | 151 | h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); |
152 | GNUNET_CLIENT_notify_transmit_ready (h->client, | 152 | if (h->client != NULL) |
153 | sizeof(struct GNUNET_MessageHeader), | 153 | { |
154 | GNUNET_TIME_UNIT_MINUTES, | 154 | if (NULL != |
155 | GNUNET_YES, | 155 | GNUNET_CLIENT_notify_transmit_ready (h->client, |
156 | &transmit_drop, | 156 | sizeof(struct GNUNET_MessageHeader), |
157 | h)) | 157 | GNUNET_TIME_UNIT_MINUTES, |
158 | return; | 158 | GNUNET_YES, |
159 | &transmit_drop, | ||
160 | h)) | ||
161 | return; | ||
162 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); | ||
163 | } | ||
159 | GNUNET_break (0); | 164 | GNUNET_break (0); |
160 | } | 165 | } |
161 | if (h->client != NULL) | ||
162 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); | ||
163 | GNUNET_ARM_stop_services (h->cfg, h->sched, "datastore", NULL); | 166 | GNUNET_ARM_stop_services (h->cfg, h->sched, "datastore", NULL); |
164 | GNUNET_free (h); | 167 | GNUNET_free (h); |
165 | } | 168 | } |
@@ -445,6 +448,22 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, | |||
445 | } | 448 | } |
446 | 449 | ||
447 | 450 | ||
451 | /** | ||
452 | * Helper function that will initiate the transmission of a message to | ||
453 | * the datastore service. The message must already be prepared and | ||
454 | * stored in the buffer at the end of the handle. The message must be | ||
455 | * of a type that expects a "DataMessage" in response. | ||
456 | * | ||
457 | * @param h handle to the service with prepared message | ||
458 | * @param cont function to call with result | ||
459 | * @param cont_cls closure | ||
460 | * @param timeout timeout for the operation | ||
461 | */ | ||
462 | static void | ||
463 | transmit_for_result (struct GNUNET_DATASTORE_Handle *h, | ||
464 | GNUNET_DATASTORE_Iterator cont, | ||
465 | void *cont_cls, | ||
466 | struct GNUNET_TIME_Relative timeout); | ||
448 | 467 | ||
449 | 468 | ||
450 | /** | 469 | /** |
@@ -464,18 +483,35 @@ with_result_response_handler (void *cls, | |||
464 | GNUNET_DATASTORE_Iterator cont = h->response_proc; | 483 | GNUNET_DATASTORE_Iterator cont = h->response_proc; |
465 | const struct DataMessage *dm; | 484 | const struct DataMessage *dm; |
466 | size_t msize; | 485 | size_t msize; |
486 | struct GNUNET_TIME_Relative remaining; | ||
467 | 487 | ||
468 | h->message_size = 0; | ||
469 | if (msg == NULL) | 488 | if (msg == NULL) |
470 | { | 489 | { |
490 | #if DEBUG_DATASTORE | ||
491 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
492 | "Got disconnected from datastore\n"); | ||
493 | #endif | ||
471 | h->response_proc = NULL; | 494 | h->response_proc = NULL; |
472 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); | 495 | GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); |
473 | h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); | 496 | h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); |
474 | cont (h->response_proc_cls, | 497 | remaining = GNUNET_TIME_absolute_get_remaining (h->timeout); |
475 | NULL, 0, NULL, 0, 0, 0, | 498 | if (remaining.value > 0) |
476 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | 499 | { |
500 | transmit_for_result (h, | ||
501 | cont, | ||
502 | h->response_proc_cls, | ||
503 | remaining); | ||
504 | } | ||
505 | else | ||
506 | { | ||
507 | h->message_size = 0; | ||
508 | cont (h->response_proc_cls, | ||
509 | NULL, 0, NULL, 0, 0, 0, | ||
510 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
511 | } | ||
477 | return; | 512 | return; |
478 | } | 513 | } |
514 | h->message_size = 0; | ||
479 | if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) | 515 | if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) |
480 | { | 516 | { |
481 | GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); | 517 | GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); |
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c index 9b23f6105..48b45894b 100644 --- a/src/datastore/gnunet-service-datastore.c +++ b/src/datastore/gnunet-service-datastore.c | |||
@@ -870,22 +870,19 @@ handle_put (void *cls, | |||
870 | struct ReservationList *pos; | 870 | struct ReservationList *pos; |
871 | uint32_t size; | 871 | uint32_t size; |
872 | 872 | ||
873 | #if DEBUG_DATASTORE | 873 | if ( (dm == NULL) || |
874 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 874 | (ntohl(dm->type) == 0) ) |
875 | "Processing `%s' request\n", | ||
876 | "PUT"); | ||
877 | #endif | ||
878 | if (ntohl(dm->type) == 0) | ||
879 | { | ||
880 | GNUNET_break (0); | ||
881 | dm = NULL; | ||
882 | } | ||
883 | if (dm == NULL) | ||
884 | { | 875 | { |
885 | GNUNET_break (0); | 876 | GNUNET_break (0); |
886 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | 877 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
887 | return; | 878 | return; |
888 | } | 879 | } |
880 | #if DEBUG_DATASTORE | ||
881 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
882 | "Processing `%s' request for `%s'\n", | ||
883 | "PUT", | ||
884 | GNUNET_h2s (&dm->key)); | ||
885 | #endif | ||
889 | rid = ntohl(dm->rid); | 886 | rid = ntohl(dm->rid); |
890 | size = ntohl(dm->size); | 887 | size = ntohl(dm->size); |
891 | if (rid > 0) | 888 | if (rid > 0) |
@@ -947,17 +944,12 @@ handle_put (void *cls, | |||
947 | */ | 944 | */ |
948 | static void | 945 | static void |
949 | handle_get (void *cls, | 946 | handle_get (void *cls, |
950 | struct GNUNET_SERVER_Client *client, | 947 | struct GNUNET_SERVER_Client *client, |
951 | const struct GNUNET_MessageHeader *message) | 948 | const struct GNUNET_MessageHeader *message) |
952 | { | 949 | { |
953 | const struct GetMessage *msg; | 950 | const struct GetMessage *msg; |
954 | uint16_t size; | 951 | uint16_t size; |
955 | 952 | ||
956 | #if DEBUG_DATASTORE | ||
957 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
958 | "Processing `%s' request\n", | ||
959 | "GET"); | ||
960 | #endif | ||
961 | size = ntohs(message->size); | 953 | size = ntohs(message->size); |
962 | if ( (size != sizeof(struct GetMessage)) && | 954 | if ( (size != sizeof(struct GetMessage)) && |
963 | (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) ) | 955 | (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) ) |
@@ -966,12 +958,19 @@ handle_get (void *cls, | |||
966 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | 958 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
967 | return; | 959 | return; |
968 | } | 960 | } |
961 | msg = (const struct GetMessage*) message; | ||
962 | #if DEBUG_DATASTORE | ||
963 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
964 | "Processing `%s' request for `%s' of type %u\n", | ||
965 | "GET", | ||
966 | GNUNET_h2s (&msg->key), | ||
967 | ntohl (msg->type)); | ||
968 | #endif | ||
969 | GNUNET_STATISTICS_update (stats, | 969 | GNUNET_STATISTICS_update (stats, |
970 | gettext_noop ("# GET requests received"), | 970 | gettext_noop ("# GET requests received"), |
971 | 1, | 971 | 1, |
972 | GNUNET_NO); | 972 | GNUNET_NO); |
973 | GNUNET_SERVER_client_keep (client); | 973 | GNUNET_SERVER_client_keep (client); |
974 | msg = (const struct GetMessage*) message; | ||
975 | if ( (size == sizeof(struct GetMessage)) && | 974 | if ( (size == sizeof(struct GetMessage)) && |
976 | (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, | 975 | (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, |
977 | &msg->key)) ) | 976 | &msg->key)) ) |
@@ -1017,17 +1016,18 @@ handle_update (void *cls, | |||
1017 | int ret; | 1016 | int ret; |
1018 | char *emsg; | 1017 | char *emsg; |
1019 | 1018 | ||
1020 | #if DEBUG_DATASTORE | ||
1021 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1022 | "Processing `%s' request\n", | ||
1023 | "UPDATE"); | ||
1024 | #endif | ||
1025 | GNUNET_STATISTICS_update (stats, | 1019 | GNUNET_STATISTICS_update (stats, |
1026 | gettext_noop ("# UPDATE requests received"), | 1020 | gettext_noop ("# UPDATE requests received"), |
1027 | 1, | 1021 | 1, |
1028 | GNUNET_NO); | 1022 | GNUNET_NO); |
1029 | msg = (const struct UpdateMessage*) message; | 1023 | msg = (const struct UpdateMessage*) message; |
1030 | emsg = NULL; | 1024 | emsg = NULL; |
1025 | #if DEBUG_DATASTORE | ||
1026 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1027 | "Processing `%s' request for %llu\n", | ||
1028 | "UPDATE", | ||
1029 | (unsigned long long) GNUNET_ntohll (msg->uid)); | ||
1030 | #endif | ||
1031 | ret = plugin->api->update (plugin->api->cls, | 1031 | ret = plugin->api->update (plugin->api->cls, |
1032 | GNUNET_ntohll(msg->uid), | 1032 | GNUNET_ntohll(msg->uid), |
1033 | (int32_t) ntohl(msg->priority), | 1033 | (int32_t) ntohl(msg->priority), |
@@ -1120,9 +1120,10 @@ remove_callback (void *cls, | |||
1120 | rc->found = GNUNET_YES; | 1120 | rc->found = GNUNET_YES; |
1121 | #if DEBUG_DATASTORE | 1121 | #if DEBUG_DATASTORE |
1122 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1122 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1123 | "Item %llu matches `%s' request.\n", | 1123 | "Item %llu matches `%s' request for key `%s'.\n", |
1124 | (unsigned long long) uid, | 1124 | (unsigned long long) uid, |
1125 | "REMOVE"); | 1125 | "REMOVE", |
1126 | GNUNET_h2s (key)); | ||
1126 | #endif | 1127 | #endif |
1127 | GNUNET_STATISTICS_update (stats, | 1128 | GNUNET_STATISTICS_update (stats, |
1128 | gettext_noop ("# bytes removed (explicit request)"), | 1129 | gettext_noop ("# bytes removed (explicit request)"), |
@@ -1151,17 +1152,18 @@ handle_remove (void *cls, | |||
1151 | GNUNET_HashCode vhash; | 1152 | GNUNET_HashCode vhash; |
1152 | struct RemoveContext *rc; | 1153 | struct RemoveContext *rc; |
1153 | 1154 | ||
1154 | #if DEBUG_DATASTORE | ||
1155 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1156 | "Processing `%s' request\n", | ||
1157 | "REMOVE"); | ||
1158 | #endif | ||
1159 | if (dm == NULL) | 1155 | if (dm == NULL) |
1160 | { | 1156 | { |
1161 | GNUNET_break (0); | 1157 | GNUNET_break (0); |
1162 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | 1158 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); |
1163 | return; | 1159 | return; |
1164 | } | 1160 | } |
1161 | #if DEBUG_DATASTORE | ||
1162 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1163 | "Processing `%s' request for `%s'\n", | ||
1164 | "REMOVE", | ||
1165 | GNUNET_h2s (&dm->key)); | ||
1166 | #endif | ||
1165 | GNUNET_STATISTICS_update (stats, | 1167 | GNUNET_STATISTICS_update (stats, |
1166 | gettext_noop ("# REMOVE requests received"), | 1168 | gettext_noop ("# REMOVE requests received"), |
1167 | 1, | 1169 | 1, |