aboutsummaryrefslogtreecommitdiff
path: root/src/datastore
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-04-21 15:14:03 +0000
committerChristian Grothoff <christian@grothoff.org>2011-04-21 15:14:03 +0000
commitbbf3c4e04289295df264539e1017f3778f45f97f (patch)
tree1d7d423183446d6ea72c979f264d78b5b26721d9 /src/datastore
parentff754768e374c38fdfe8c677e610431dcc667218 (diff)
downloadgnunet-bbf3c4e04289295df264539e1017f3778f45f97f.tar.gz
gnunet-bbf3c4e04289295df264539e1017f3778f45f97f.zip
fixes
Diffstat (limited to 'src/datastore')
-rw-r--r--src/datastore/datastore.h2
-rw-r--r--src/datastore/datastore_api.c96
2 files changed, 64 insertions, 34 deletions
diff --git a/src/datastore/datastore.h b/src/datastore/datastore.h
index 8fa0ca044..55ca7c8e5 100644
--- a/src/datastore/datastore.h
+++ b/src/datastore/datastore.h
@@ -27,7 +27,7 @@
27#ifndef DATASTORE_H 27#ifndef DATASTORE_H
28#define DATASTORE_H 28#define DATASTORE_H
29 29
30#define DEBUG_DATASTORE GNUNET_NO 30#define DEBUG_DATASTORE GNUNET_YES
31 31
32#include "gnunet_util_lib.h" 32#include "gnunet_util_lib.h"
33 33
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index ef3736950..2bba2e8ee 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -187,7 +187,6 @@ struct GNUNET_DATASTORE_Handle
187 */ 187 */
188 const struct GNUNET_CONFIGURATION_Handle *cfg; 188 const struct GNUNET_CONFIGURATION_Handle *cfg;
189 189
190
191 /** 190 /**
192 * Current connection to the datastore service. 191 * Current connection to the datastore service.
193 */ 192 */
@@ -241,6 +240,12 @@ struct GNUNET_DATASTORE_Handle
241 */ 240 */
242 int in_receive; 241 int in_receive;
243 242
243 /**
244 * We should either receive (and ignore) an 'END' message or force a
245 * disconnect for the next message from the service.
246 */
247 unsigned int expect_end_or_disconnect;
248
244}; 249};
245 250
246 251
@@ -600,7 +605,7 @@ transmit_request (void *cls,
600 h->in_receive = GNUNET_YES; 605 h->in_receive = GNUNET_YES;
601 GNUNET_CLIENT_receive (h->client, 606 GNUNET_CLIENT_receive (h->client,
602 qe->response_proc, 607 qe->response_proc,
603 qe, 608 h,
604 GNUNET_TIME_absolute_get_remaining (qe->timeout)); 609 GNUNET_TIME_absolute_get_remaining (qe->timeout));
605 GNUNET_STATISTICS_update (h->stats, 610 GNUNET_STATISTICS_update (h->stats,
606 gettext_noop ("# bytes sent to datastore"), 611 gettext_noop ("# bytes sent to datastore"),
@@ -710,16 +715,23 @@ process_status_message (void *cls,
710 const struct 715 const struct
711 GNUNET_MessageHeader * msg) 716 GNUNET_MessageHeader * msg)
712{ 717{
713 struct GNUNET_DATASTORE_QueueEntry *qe = cls; 718 struct GNUNET_DATASTORE_Handle *h = cls;
714 struct GNUNET_DATASTORE_Handle *h = qe->h; 719 struct GNUNET_DATASTORE_QueueEntry *qe;
715 struct StatusContext rc = qe->qc.sc; 720 struct StatusContext rc;
716 const struct StatusMessage *sm; 721 const struct StatusMessage *sm;
717 const char *emsg; 722 const char *emsg;
718 int32_t status; 723 int32_t status;
719 int was_transmitted; 724 int was_transmitted;
720 725
721 h->in_receive = GNUNET_NO; 726 h->in_receive = GNUNET_NO;
727 if (NULL == (qe = h->queue_head))
728 {
729 GNUNET_break (0);
730 do_disconnect (h);
731 return;
732 }
722 was_transmitted = qe->was_transmitted; 733 was_transmitted = qe->was_transmitted;
734 rc = qe->qc.sc;
723 if (msg == NULL) 735 if (msg == NULL)
724 { 736 {
725 free_queue_entry (qe); 737 free_queue_entry (qe);
@@ -734,7 +746,6 @@ process_status_message (void *cls,
734 return; 746 return;
735 } 747 }
736 GNUNET_assert (GNUNET_YES == qe->was_transmitted); 748 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
737 GNUNET_assert (h->queue_head == qe);
738 free_queue_entry (qe); 749 free_queue_entry (qe);
739 if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) || 750 if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
740 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 751 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) )
@@ -1169,43 +1180,54 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1169 */ 1180 */
1170static void 1181static void
1171process_result_message (void *cls, 1182process_result_message (void *cls,
1172 const struct GNUNET_MessageHeader * msg) 1183 const struct GNUNET_MessageHeader *msg)
1173{ 1184{
1174 struct GNUNET_DATASTORE_QueueEntry *qe = cls; 1185 struct GNUNET_DATASTORE_Handle *h = cls;
1175 struct GNUNET_DATASTORE_Handle *h = qe->h; 1186 struct GNUNET_DATASTORE_QueueEntry *qe;
1176 struct ResultContext rc = qe->qc.rc; 1187 struct ResultContext rc;
1177 const struct DataMessage *dm; 1188 const struct DataMessage *dm;
1178 int was_transmitted; 1189 int was_transmitted;
1179 1190
1180 h->in_receive = GNUNET_NO; 1191 h->in_receive = GNUNET_NO;
1181 if (msg == NULL) 1192 if (msg == NULL)
1182 { 1193 {
1183 was_transmitted = qe->was_transmitted; 1194 if (NULL != (qe = h->queue_head))
1184 free_queue_entry (qe);
1185 if (was_transmitted == GNUNET_YES)
1186 {
1187 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1188 _("Failed to receive response from database.\n"));
1189 do_disconnect (h);
1190 }
1191 else
1192 { 1195 {
1196 was_transmitted = qe->was_transmitted;
1197 free_queue_entry (qe);
1198 rc = qe->qc.rc;
1199 if (was_transmitted == GNUNET_YES)
1200 {
1201 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1202 _("Failed to receive response from database.\n"));
1203 do_disconnect (h);
1204 }
1205 else
1206 {
1193#if DEBUG_DATASTORE 1207#if DEBUG_DATASTORE
1194 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1208 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1195 "Request dropped due to finite datastore queue length.\n"); 1209 "Request dropped due to finite datastore queue length.\n");
1196#endif 1210#endif
1211 }
1212 if (rc.iter != NULL)
1213 rc.iter (rc.iter_cls,
1214 NULL, 0, NULL, 0, 0, 0,
1215 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1197 } 1216 }
1198 if (rc.iter != NULL)
1199 rc.iter (rc.iter_cls,
1200 NULL, 0, NULL, 0, 0, 0,
1201 GNUNET_TIME_UNIT_ZERO_ABS, 0);
1202 return; 1217 return;
1203 } 1218 }
1204 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1205 GNUNET_assert (h->queue_head == qe);
1206 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 1219 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1207 { 1220 {
1208 GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader)); 1221 GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1222 if (h->expect_end_or_disconnect > 0)
1223 {
1224 h->expect_end_or_disconnect--;
1225 process_queue (h);
1226 return;
1227 }
1228 qe = h->queue_head;
1229 rc = qe->qc.rc;
1230 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1209 free_queue_entry (qe); 1231 free_queue_entry (qe);
1210#if DEBUG_DATASTORE 1232#if DEBUG_DATASTORE
1211 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1233 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1221,6 +1243,16 @@ process_result_message (void *cls,
1221 process_queue (h); 1243 process_queue (h);
1222 return; 1244 return;
1223 } 1245 }
1246 if (h->expect_end_or_disconnect > 0)
1247 {
1248 /* only 'END' allowed, must reconnect */
1249 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1250 do_disconnect (h);
1251 return;
1252 }
1253 qe = h->queue_head;
1254 rc = qe->qc.rc;
1255 GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1224 if ( (ntohs(msg->size) < sizeof(struct DataMessage)) || 1256 if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1225 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) || 1257 (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1226 (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) ) 1258 (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
@@ -1500,11 +1532,10 @@ GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h)
1500{ 1532{
1501 struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head; 1533 struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1502 1534
1503 GNUNET_assert (&process_result_message == qe->response_proc);
1504 h->in_receive = GNUNET_YES; 1535 h->in_receive = GNUNET_YES;
1505 GNUNET_CLIENT_receive (h->client, 1536 GNUNET_CLIENT_receive (h->client,
1506 qe->response_proc, 1537 &process_result_message,
1507 qe, 1538 h,
1508 GNUNET_TIME_absolute_get_remaining (qe->timeout)); 1539 GNUNET_TIME_absolute_get_remaining (qe->timeout));
1509} 1540}
1510 1541
@@ -1531,8 +1562,7 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1531 if (GNUNET_YES == qe->was_transmitted) 1562 if (GNUNET_YES == qe->was_transmitted)
1532 { 1563 {
1533 free_queue_entry (qe); 1564 free_queue_entry (qe);
1534 h->retry_time = GNUNET_TIME_UNIT_ZERO; 1565 h->expect_end_or_disconnect++;
1535 do_disconnect (h);
1536 return; 1566 return;
1537 } 1567 }
1538 free_queue_entry (qe); 1568 free_queue_entry (qe);