diff options
author | Christian Grothoff <christian@grothoff.org> | 2011-04-21 15:14:03 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2011-04-21 15:14:03 +0000 |
commit | bbf3c4e04289295df264539e1017f3778f45f97f (patch) | |
tree | 1d7d423183446d6ea72c979f264d78b5b26721d9 /src | |
parent | ff754768e374c38fdfe8c677e610431dcc667218 (diff) | |
download | gnunet-bbf3c4e04289295df264539e1017f3778f45f97f.tar.gz gnunet-bbf3c4e04289295df264539e1017f3778f45f97f.zip |
fixes
Diffstat (limited to 'src')
-rw-r--r-- | src/datastore/datastore.h | 2 | ||||
-rw-r--r-- | src/datastore/datastore_api.c | 96 |
2 files changed, 64 insertions, 34 deletions
diff --git a/src/datastore/datastore.h b/src/datastore/datastore.h index 8fa0ca044..55ca7c8e5 100644 --- a/src/datastore/datastore.h +++ b/src/datastore/datastore.h | |||
@@ -27,7 +27,7 @@ | |||
27 | #ifndef DATASTORE_H | 27 | #ifndef DATASTORE_H |
28 | #define DATASTORE_H | 28 | #define DATASTORE_H |
29 | 29 | ||
30 | #define DEBUG_DATASTORE GNUNET_NO | 30 | #define DEBUG_DATASTORE GNUNET_YES |
31 | 31 | ||
32 | #include "gnunet_util_lib.h" | 32 | #include "gnunet_util_lib.h" |
33 | 33 | ||
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index ef3736950..2bba2e8ee 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c | |||
@@ -187,7 +187,6 @@ struct GNUNET_DATASTORE_Handle | |||
187 | */ | 187 | */ |
188 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 188 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
189 | 189 | ||
190 | |||
191 | /** | 190 | /** |
192 | * Current connection to the datastore service. | 191 | * Current connection to the datastore service. |
193 | */ | 192 | */ |
@@ -241,6 +240,12 @@ struct GNUNET_DATASTORE_Handle | |||
241 | */ | 240 | */ |
242 | int in_receive; | 241 | int in_receive; |
243 | 242 | ||
243 | /** | ||
244 | * We should either receive (and ignore) an 'END' message or force a | ||
245 | * disconnect for the next message from the service. | ||
246 | */ | ||
247 | unsigned int expect_end_or_disconnect; | ||
248 | |||
244 | }; | 249 | }; |
245 | 250 | ||
246 | 251 | ||
@@ -600,7 +605,7 @@ transmit_request (void *cls, | |||
600 | h->in_receive = GNUNET_YES; | 605 | h->in_receive = GNUNET_YES; |
601 | GNUNET_CLIENT_receive (h->client, | 606 | GNUNET_CLIENT_receive (h->client, |
602 | qe->response_proc, | 607 | qe->response_proc, |
603 | qe, | 608 | h, |
604 | GNUNET_TIME_absolute_get_remaining (qe->timeout)); | 609 | GNUNET_TIME_absolute_get_remaining (qe->timeout)); |
605 | GNUNET_STATISTICS_update (h->stats, | 610 | GNUNET_STATISTICS_update (h->stats, |
606 | gettext_noop ("# bytes sent to datastore"), | 611 | gettext_noop ("# bytes sent to datastore"), |
@@ -710,16 +715,23 @@ process_status_message (void *cls, | |||
710 | const struct | 715 | const struct |
711 | GNUNET_MessageHeader * msg) | 716 | GNUNET_MessageHeader * msg) |
712 | { | 717 | { |
713 | struct GNUNET_DATASTORE_QueueEntry *qe = cls; | 718 | struct GNUNET_DATASTORE_Handle *h = cls; |
714 | struct GNUNET_DATASTORE_Handle *h = qe->h; | 719 | struct GNUNET_DATASTORE_QueueEntry *qe; |
715 | struct StatusContext rc = qe->qc.sc; | 720 | struct StatusContext rc; |
716 | const struct StatusMessage *sm; | 721 | const struct StatusMessage *sm; |
717 | const char *emsg; | 722 | const char *emsg; |
718 | int32_t status; | 723 | int32_t status; |
719 | int was_transmitted; | 724 | int was_transmitted; |
720 | 725 | ||
721 | h->in_receive = GNUNET_NO; | 726 | h->in_receive = GNUNET_NO; |
727 | if (NULL == (qe = h->queue_head)) | ||
728 | { | ||
729 | GNUNET_break (0); | ||
730 | do_disconnect (h); | ||
731 | return; | ||
732 | } | ||
722 | was_transmitted = qe->was_transmitted; | 733 | was_transmitted = qe->was_transmitted; |
734 | rc = qe->qc.sc; | ||
723 | if (msg == NULL) | 735 | if (msg == NULL) |
724 | { | 736 | { |
725 | free_queue_entry (qe); | 737 | free_queue_entry (qe); |
@@ -734,7 +746,6 @@ process_status_message (void *cls, | |||
734 | return; | 746 | return; |
735 | } | 747 | } |
736 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); | 748 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); |
737 | GNUNET_assert (h->queue_head == qe); | ||
738 | free_queue_entry (qe); | 749 | free_queue_entry (qe); |
739 | if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) || | 750 | if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) || |
740 | (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) | 751 | (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) |
@@ -1169,43 +1180,54 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, | |||
1169 | */ | 1180 | */ |
1170 | static void | 1181 | static void |
1171 | process_result_message (void *cls, | 1182 | process_result_message (void *cls, |
1172 | const struct GNUNET_MessageHeader * msg) | 1183 | const struct GNUNET_MessageHeader *msg) |
1173 | { | 1184 | { |
1174 | struct GNUNET_DATASTORE_QueueEntry *qe = cls; | 1185 | struct GNUNET_DATASTORE_Handle *h = cls; |
1175 | struct GNUNET_DATASTORE_Handle *h = qe->h; | 1186 | struct GNUNET_DATASTORE_QueueEntry *qe; |
1176 | struct ResultContext rc = qe->qc.rc; | 1187 | struct ResultContext rc; |
1177 | const struct DataMessage *dm; | 1188 | const struct DataMessage *dm; |
1178 | int was_transmitted; | 1189 | int was_transmitted; |
1179 | 1190 | ||
1180 | h->in_receive = GNUNET_NO; | 1191 | h->in_receive = GNUNET_NO; |
1181 | if (msg == NULL) | 1192 | if (msg == NULL) |
1182 | { | 1193 | { |
1183 | was_transmitted = qe->was_transmitted; | 1194 | if (NULL != (qe = h->queue_head)) |
1184 | free_queue_entry (qe); | ||
1185 | if (was_transmitted == GNUNET_YES) | ||
1186 | { | ||
1187 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1188 | _("Failed to receive response from database.\n")); | ||
1189 | do_disconnect (h); | ||
1190 | } | ||
1191 | else | ||
1192 | { | 1195 | { |
1196 | was_transmitted = qe->was_transmitted; | ||
1197 | free_queue_entry (qe); | ||
1198 | rc = qe->qc.rc; | ||
1199 | if (was_transmitted == GNUNET_YES) | ||
1200 | { | ||
1201 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
1202 | _("Failed to receive response from database.\n")); | ||
1203 | do_disconnect (h); | ||
1204 | } | ||
1205 | else | ||
1206 | { | ||
1193 | #if DEBUG_DATASTORE | 1207 | #if DEBUG_DATASTORE |
1194 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1208 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1195 | "Request dropped due to finite datastore queue length.\n"); | 1209 | "Request dropped due to finite datastore queue length.\n"); |
1196 | #endif | 1210 | #endif |
1211 | } | ||
1212 | if (rc.iter != NULL) | ||
1213 | rc.iter (rc.iter_cls, | ||
1214 | NULL, 0, NULL, 0, 0, 0, | ||
1215 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
1197 | } | 1216 | } |
1198 | if (rc.iter != NULL) | ||
1199 | rc.iter (rc.iter_cls, | ||
1200 | NULL, 0, NULL, 0, 0, 0, | ||
1201 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
1202 | return; | 1217 | return; |
1203 | } | 1218 | } |
1204 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); | ||
1205 | GNUNET_assert (h->queue_head == qe); | ||
1206 | if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) | 1219 | if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) |
1207 | { | 1220 | { |
1208 | GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); | 1221 | GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); |
1222 | if (h->expect_end_or_disconnect > 0) | ||
1223 | { | ||
1224 | h->expect_end_or_disconnect--; | ||
1225 | process_queue (h); | ||
1226 | return; | ||
1227 | } | ||
1228 | qe = h->queue_head; | ||
1229 | rc = qe->qc.rc; | ||
1230 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); | ||
1209 | free_queue_entry (qe); | 1231 | free_queue_entry (qe); |
1210 | #if DEBUG_DATASTORE | 1232 | #if DEBUG_DATASTORE |
1211 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1233 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1221,6 +1243,16 @@ process_result_message (void *cls, | |||
1221 | process_queue (h); | 1243 | process_queue (h); |
1222 | return; | 1244 | return; |
1223 | } | 1245 | } |
1246 | if (h->expect_end_or_disconnect > 0) | ||
1247 | { | ||
1248 | /* only 'END' allowed, must reconnect */ | ||
1249 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | ||
1250 | do_disconnect (h); | ||
1251 | return; | ||
1252 | } | ||
1253 | qe = h->queue_head; | ||
1254 | rc = qe->qc.rc; | ||
1255 | GNUNET_assert (GNUNET_YES == qe->was_transmitted); | ||
1224 | if ( (ntohs(msg->size) < sizeof(struct DataMessage)) || | 1256 | if ( (ntohs(msg->size) < sizeof(struct DataMessage)) || |
1225 | (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || | 1257 | (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || |
1226 | (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) ) | 1258 | (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) ) |
@@ -1500,11 +1532,10 @@ GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h) | |||
1500 | { | 1532 | { |
1501 | struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head; | 1533 | struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head; |
1502 | 1534 | ||
1503 | GNUNET_assert (&process_result_message == qe->response_proc); | ||
1504 | h->in_receive = GNUNET_YES; | 1535 | h->in_receive = GNUNET_YES; |
1505 | GNUNET_CLIENT_receive (h->client, | 1536 | GNUNET_CLIENT_receive (h->client, |
1506 | qe->response_proc, | 1537 | &process_result_message, |
1507 | qe, | 1538 | h, |
1508 | GNUNET_TIME_absolute_get_remaining (qe->timeout)); | 1539 | GNUNET_TIME_absolute_get_remaining (qe->timeout)); |
1509 | } | 1540 | } |
1510 | 1541 | ||
@@ -1531,8 +1562,7 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) | |||
1531 | if (GNUNET_YES == qe->was_transmitted) | 1562 | if (GNUNET_YES == qe->was_transmitted) |
1532 | { | 1563 | { |
1533 | free_queue_entry (qe); | 1564 | free_queue_entry (qe); |
1534 | h->retry_time = GNUNET_TIME_UNIT_ZERO; | 1565 | h->expect_end_or_disconnect++; |
1535 | do_disconnect (h); | ||
1536 | return; | 1566 | return; |
1537 | } | 1567 | } |
1538 | free_queue_entry (qe); | 1568 | free_queue_entry (qe); |