aboutsummaryrefslogtreecommitdiff
path: root/src/datastore
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2010-04-04 22:02:08 +0000
committerChristian Grothoff <christian@grothoff.org>2010-04-04 22:02:08 +0000
commit4b78e6983509f2078c4f52888187436c0a7b6aeb (patch)
tree9035de10dd6e4e922f83e60a44347730ffbd9551 /src/datastore
parentecd7e5a64fca8678d585ba9f6d6b7e217c44ff33 (diff)
downloadgnunet-4b78e6983509f2078c4f52888187436c0a7b6aeb.tar.gz
gnunet-4b78e6983509f2078c4f52888187436c0a7b6aeb.zip
make more robust to disconnect
Diffstat (limited to 'src/datastore')
-rw-r--r--src/datastore/datastore_api.c72
-rw-r--r--src/datastore/gnunet-service-datastore.c64
2 files changed, 87 insertions, 49 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index cb70cc1f3..fb1939cd9 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -143,23 +143,26 @@ transmit_drop (void *cls,
143void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, 143void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
144 int drop) 144 int drop)
145{ 145{
146 GNUNET_assert (0 == h->message_size); 146 if (h->client != NULL)
147 GNUNET_assert (NULL == h->response_proc); 147 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
148 if ( (GNUNET_YES == drop) && 148 h->client = NULL;
149 (h->client != NULL) ) 149 if (GNUNET_YES == drop)
150 { 150 {
151 if (NULL != 151 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
152 GNUNET_CLIENT_notify_transmit_ready (h->client, 152 if (h->client != NULL)
153 sizeof(struct GNUNET_MessageHeader), 153 {
154 GNUNET_TIME_UNIT_MINUTES, 154 if (NULL !=
155 GNUNET_YES, 155 GNUNET_CLIENT_notify_transmit_ready (h->client,
156 &transmit_drop, 156 sizeof(struct GNUNET_MessageHeader),
157 h)) 157 GNUNET_TIME_UNIT_MINUTES,
158 return; 158 GNUNET_YES,
159 &transmit_drop,
160 h))
161 return;
162 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
163 }
159 GNUNET_break (0); 164 GNUNET_break (0);
160 } 165 }
161 if (h->client != NULL)
162 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
163 GNUNET_ARM_stop_services (h->cfg, h->sched, "datastore", NULL); 166 GNUNET_ARM_stop_services (h->cfg, h->sched, "datastore", NULL);
164 GNUNET_free (h); 167 GNUNET_free (h);
165} 168}
@@ -445,6 +448,22 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
445} 448}
446 449
447 450
451/**
452 * Helper function that will initiate the transmission of a message to
453 * the datastore service. The message must already be prepared and
454 * stored in the buffer at the end of the handle. The message must be
455 * of a type that expects a "DataMessage" in response.
456 *
457 * @param h handle to the service with prepared message
458 * @param cont function to call with result
459 * @param cont_cls closure
460 * @param timeout timeout for the operation
461 */
462static void
463transmit_for_result (struct GNUNET_DATASTORE_Handle *h,
464 GNUNET_DATASTORE_Iterator cont,
465 void *cont_cls,
466 struct GNUNET_TIME_Relative timeout);
448 467
449 468
450/** 469/**
@@ -464,18 +483,35 @@ with_result_response_handler (void *cls,
464 GNUNET_DATASTORE_Iterator cont = h->response_proc; 483 GNUNET_DATASTORE_Iterator cont = h->response_proc;
465 const struct DataMessage *dm; 484 const struct DataMessage *dm;
466 size_t msize; 485 size_t msize;
486 struct GNUNET_TIME_Relative remaining;
467 487
468 h->message_size = 0;
469 if (msg == NULL) 488 if (msg == NULL)
470 { 489 {
490#if DEBUG_DATASTORE
491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
492 "Got disconnected from datastore\n");
493#endif
471 h->response_proc = NULL; 494 h->response_proc = NULL;
472 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); 495 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
473 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg); 496 h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
474 cont (h->response_proc_cls, 497 remaining = GNUNET_TIME_absolute_get_remaining (h->timeout);
475 NULL, 0, NULL, 0, 0, 0, 498 if (remaining.value > 0)
476 GNUNET_TIME_UNIT_ZERO_ABS, 0); 499 {
500 transmit_for_result (h,
501 cont,
502 h->response_proc_cls,
503 remaining);
504 }
505 else
506 {
507 h->message_size = 0;
508 cont (h->response_proc_cls,
509 NULL, 0, NULL, 0, 0, 0,
510 GNUNET_TIME_UNIT_ZERO_ABS, 0);
511 }
477 return; 512 return;
478 } 513 }
514 h->message_size = 0;
479 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 515 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
480 { 516 {
481 GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); 517 GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c
index 9b23f6105..48b45894b 100644
--- a/src/datastore/gnunet-service-datastore.c
+++ b/src/datastore/gnunet-service-datastore.c
@@ -870,22 +870,19 @@ handle_put (void *cls,
870 struct ReservationList *pos; 870 struct ReservationList *pos;
871 uint32_t size; 871 uint32_t size;
872 872
873#if DEBUG_DATASTORE 873 if ( (dm == NULL) ||
874 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 874 (ntohl(dm->type) == 0) )
875 "Processing `%s' request\n",
876 "PUT");
877#endif
878 if (ntohl(dm->type) == 0)
879 {
880 GNUNET_break (0);
881 dm = NULL;
882 }
883 if (dm == NULL)
884 { 875 {
885 GNUNET_break (0); 876 GNUNET_break (0);
886 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 877 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
887 return; 878 return;
888 } 879 }
880#if DEBUG_DATASTORE
881 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
882 "Processing `%s' request for `%s'\n",
883 "PUT",
884 GNUNET_h2s (&dm->key));
885#endif
889 rid = ntohl(dm->rid); 886 rid = ntohl(dm->rid);
890 size = ntohl(dm->size); 887 size = ntohl(dm->size);
891 if (rid > 0) 888 if (rid > 0)
@@ -947,17 +944,12 @@ handle_put (void *cls,
947 */ 944 */
948static void 945static void
949handle_get (void *cls, 946handle_get (void *cls,
950 struct GNUNET_SERVER_Client *client, 947 struct GNUNET_SERVER_Client *client,
951 const struct GNUNET_MessageHeader *message) 948 const struct GNUNET_MessageHeader *message)
952{ 949{
953 const struct GetMessage *msg; 950 const struct GetMessage *msg;
954 uint16_t size; 951 uint16_t size;
955 952
956#if DEBUG_DATASTORE
957 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
958 "Processing `%s' request\n",
959 "GET");
960#endif
961 size = ntohs(message->size); 953 size = ntohs(message->size);
962 if ( (size != sizeof(struct GetMessage)) && 954 if ( (size != sizeof(struct GetMessage)) &&
963 (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) ) 955 (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
@@ -966,12 +958,19 @@ handle_get (void *cls,
966 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 958 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
967 return; 959 return;
968 } 960 }
961 msg = (const struct GetMessage*) message;
962#if DEBUG_DATASTORE
963 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
964 "Processing `%s' request for `%s' of type %u\n",
965 "GET",
966 GNUNET_h2s (&msg->key),
967 ntohl (msg->type));
968#endif
969 GNUNET_STATISTICS_update (stats, 969 GNUNET_STATISTICS_update (stats,
970 gettext_noop ("# GET requests received"), 970 gettext_noop ("# GET requests received"),
971 1, 971 1,
972 GNUNET_NO); 972 GNUNET_NO);
973 GNUNET_SERVER_client_keep (client); 973 GNUNET_SERVER_client_keep (client);
974 msg = (const struct GetMessage*) message;
975 if ( (size == sizeof(struct GetMessage)) && 974 if ( (size == sizeof(struct GetMessage)) &&
976 (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter, 975 (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
977 &msg->key)) ) 976 &msg->key)) )
@@ -1017,17 +1016,18 @@ handle_update (void *cls,
1017 int ret; 1016 int ret;
1018 char *emsg; 1017 char *emsg;
1019 1018
1020#if DEBUG_DATASTORE
1021 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022 "Processing `%s' request\n",
1023 "UPDATE");
1024#endif
1025 GNUNET_STATISTICS_update (stats, 1019 GNUNET_STATISTICS_update (stats,
1026 gettext_noop ("# UPDATE requests received"), 1020 gettext_noop ("# UPDATE requests received"),
1027 1, 1021 1,
1028 GNUNET_NO); 1022 GNUNET_NO);
1029 msg = (const struct UpdateMessage*) message; 1023 msg = (const struct UpdateMessage*) message;
1030 emsg = NULL; 1024 emsg = NULL;
1025#if DEBUG_DATASTORE
1026 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027 "Processing `%s' request for %llu\n",
1028 "UPDATE",
1029 (unsigned long long) GNUNET_ntohll (msg->uid));
1030#endif
1031 ret = plugin->api->update (plugin->api->cls, 1031 ret = plugin->api->update (plugin->api->cls,
1032 GNUNET_ntohll(msg->uid), 1032 GNUNET_ntohll(msg->uid),
1033 (int32_t) ntohl(msg->priority), 1033 (int32_t) ntohl(msg->priority),
@@ -1120,9 +1120,10 @@ remove_callback (void *cls,
1120 rc->found = GNUNET_YES; 1120 rc->found = GNUNET_YES;
1121#if DEBUG_DATASTORE 1121#if DEBUG_DATASTORE
1122 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1122 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1123 "Item %llu matches `%s' request.\n", 1123 "Item %llu matches `%s' request for key `%s'.\n",
1124 (unsigned long long) uid, 1124 (unsigned long long) uid,
1125 "REMOVE"); 1125 "REMOVE",
1126 GNUNET_h2s (key));
1126#endif 1127#endif
1127 GNUNET_STATISTICS_update (stats, 1128 GNUNET_STATISTICS_update (stats,
1128 gettext_noop ("# bytes removed (explicit request)"), 1129 gettext_noop ("# bytes removed (explicit request)"),
@@ -1151,17 +1152,18 @@ handle_remove (void *cls,
1151 GNUNET_HashCode vhash; 1152 GNUNET_HashCode vhash;
1152 struct RemoveContext *rc; 1153 struct RemoveContext *rc;
1153 1154
1154#if DEBUG_DATASTORE
1155 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1156 "Processing `%s' request\n",
1157 "REMOVE");
1158#endif
1159 if (dm == NULL) 1155 if (dm == NULL)
1160 { 1156 {
1161 GNUNET_break (0); 1157 GNUNET_break (0);
1162 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 1158 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1163 return; 1159 return;
1164 } 1160 }
1161#if DEBUG_DATASTORE
1162 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163 "Processing `%s' request for `%s'\n",
1164 "REMOVE",
1165 GNUNET_h2s (&dm->key));
1166#endif
1165 GNUNET_STATISTICS_update (stats, 1167 GNUNET_STATISTICS_update (stats,
1166 gettext_noop ("# REMOVE requests received"), 1168 gettext_noop ("# REMOVE requests received"),
1167 1, 1169 1,