aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/datastore_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r--src/datastore/datastore_api.c191
1 files changed, 79 insertions, 112 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index 85e402a4d..ff49c106e 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -367,8 +367,8 @@ timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
367 struct GNUNET_DATASTORE_QueueEntry *qe = cls; 367 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
368 368
369 GNUNET_STATISTICS_update (qe->h->stats, 369 GNUNET_STATISTICS_update (qe->h->stats,
370 gettext_noop ("# queue entry timeouts"), 370 gettext_noop ("# queue entry timeouts"), 1,
371 1, GNUNET_NO); 371 GNUNET_NO);
372 qe->task = GNUNET_SCHEDULER_NO_TASK; 372 qe->task = GNUNET_SCHEDULER_NO_TASK;
373 GNUNET_assert (qe->was_transmitted == GNUNET_NO); 373 GNUNET_assert (qe->was_transmitted == GNUNET_NO);
374#if DEBUG_DATASTORE 374#if DEBUG_DATASTORE
@@ -394,10 +394,8 @@ timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
394 * @return NULL if the queue is full 394 * @return NULL if the queue is full
395 */ 395 */
396static struct GNUNET_DATASTORE_QueueEntry * 396static struct GNUNET_DATASTORE_QueueEntry *
397make_queue_entry (struct GNUNET_DATASTORE_Handle *h, 397make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
398 size_t msize, 398 unsigned int queue_priority, unsigned int max_queue_size,
399 unsigned int queue_priority,
400 unsigned int max_queue_size,
401 struct GNUNET_TIME_Relative timeout, 399 struct GNUNET_TIME_Relative timeout,
402 GNUNET_CLIENT_MessageHandler response_proc, 400 GNUNET_CLIENT_MessageHandler response_proc,
403 const union QueueContext *qc) 401 const union QueueContext *qc)
@@ -408,16 +406,16 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
408 406
409 c = 0; 407 c = 0;
410 pos = h->queue_head; 408 pos = h->queue_head;
411 while ((pos != NULL) && 409 while ((pos != NULL) && (c < max_queue_size) &&
412 (c < max_queue_size) && (pos->priority >= queue_priority)) 410 (pos->priority >= queue_priority))
413 { 411 {
414 c++; 412 c++;
415 pos = pos->next; 413 pos = pos->next;
416 } 414 }
417 if (c >= max_queue_size) 415 if (c >= max_queue_size)
418 { 416 {
419 GNUNET_STATISTICS_update (h->stats, 417 GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
420 gettext_noop ("# queue overflows"), 1, GNUNET_NO); 418 GNUNET_NO);
421 return NULL; 419 return NULL;
422 } 420 }
423 ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize); 421 ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
@@ -443,8 +441,7 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
443 pos = h->queue_head; 441 pos = h->queue_head;
444 } 442 }
445 c++; 443 c++;
446 GNUNET_STATISTICS_update (h->stats, 444 GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
447 gettext_noop ("# queue entries created"),
448 1, GNUNET_NO); 445 1, GNUNET_NO);
449 GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret); 446 GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
450 h->queue_size++; 447 h->queue_size++;
@@ -540,15 +537,14 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h)
540 return; 537 return;
541 } 538 }
542#if 0 539#if 0
543 GNUNET_STATISTICS_update (stats, 540 GNUNET_STATISTICS_update (stats, gettext_noop ("# reconnected to DATASTORE"),
544 gettext_noop ("# reconnected to DATASTORE"),
545 1, GNUNET_NO); 541 1, GNUNET_NO);
546#endif 542#endif
547 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); 543 GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
548 h->skip_next_messages = 0; 544 h->skip_next_messages = 0;
549 h->client = NULL; 545 h->client = NULL;
550 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time, 546 h->reconnect_task =
551 &try_reconnect, h); 547 GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
552} 548}
553 549
554 550
@@ -628,12 +624,11 @@ transmit_request (void *cls, size_t size, void *buf)
628 qe->task = GNUNET_SCHEDULER_NO_TASK; 624 qe->task = GNUNET_SCHEDULER_NO_TASK;
629 GNUNET_assert (GNUNET_NO == h->in_receive); 625 GNUNET_assert (GNUNET_NO == h->in_receive);
630 h->in_receive = GNUNET_YES; 626 h->in_receive = GNUNET_YES;
631 GNUNET_CLIENT_receive (h->client, 627 GNUNET_CLIENT_receive (h->client, &receive_cb, h,
632 &receive_cb, 628 GNUNET_TIME_absolute_get_remaining (qe->timeout));
633 h, GNUNET_TIME_absolute_get_remaining (qe->timeout));
634 GNUNET_STATISTICS_update (h->stats, 629 GNUNET_STATISTICS_update (h->stats,
635 gettext_noop ("# bytes sent to datastore"), 630 gettext_noop ("# bytes sent to datastore"), 1,
636 1, GNUNET_NO); 631 GNUNET_NO);
637 return msize; 632 return msize;
638} 633}
639 634
@@ -686,11 +681,11 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
686 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 681 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687 "Queueing %u byte request to DATASTORE\n", qe->message_size); 682 "Queueing %u byte request to DATASTORE\n", qe->message_size);
688#endif 683#endif
689 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, 684 h->th =
690 qe->message_size, 685 GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
691 GNUNET_TIME_absolute_get_remaining 686 GNUNET_TIME_absolute_get_remaining
692 (qe->timeout), GNUNET_YES, 687 (qe->timeout), GNUNET_YES,
693 &transmit_request, h); 688 &transmit_request, h);
694 GNUNET_assert (GNUNET_NO == h->in_receive); 689 GNUNET_assert (GNUNET_NO == h->in_receive);
695 GNUNET_break (NULL != h->th); 690 GNUNET_break (NULL != h->th);
696} 691}
@@ -768,8 +763,7 @@ process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
768 else 763 else
769 process_queue (h); 764 process_queue (h);
770 if (rc.cont != NULL) 765 if (rc.cont != NULL)
771 rc.cont (rc.cont_cls, 766 rc.cont (rc.cont_cls, GNUNET_SYSERR,
772 GNUNET_SYSERR,
773 _("Failed to receive status response from database.")); 767 _("Failed to receive status response from database."));
774 return; 768 return;
775 } 769 }
@@ -782,8 +776,7 @@ process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
782 h->retry_time = GNUNET_TIME_UNIT_ZERO; 776 h->retry_time = GNUNET_TIME_UNIT_ZERO;
783 do_disconnect (h); 777 do_disconnect (h);
784 if (rc.cont != NULL) 778 if (rc.cont != NULL)
785 rc.cont (rc.cont_cls, 779 rc.cont (rc.cont_cls, GNUNET_SYSERR,
786 GNUNET_SYSERR,
787 _("Error reading response from datastore service")); 780 _("Error reading response from datastore service"));
788 return; 781 return;
789 } 782 }
@@ -805,12 +798,12 @@ process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
805 emsg = _("Invalid error message received from datastore service"); 798 emsg = _("Invalid error message received from datastore service");
806 } 799 }
807#if DEBUG_DATASTORE 800#if DEBUG_DATASTORE
808 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 801 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status,
809 "Received status %d/%s\n", (int) status, emsg); 802 emsg);
810#endif 803#endif
811 GNUNET_STATISTICS_update (h->stats, 804 GNUNET_STATISTICS_update (h->stats,
812 gettext_noop ("# status messages received"), 805 gettext_noop ("# status messages received"), 1,
813 1, GNUNET_NO); 806 GNUNET_NO);
814 h->retry_time.rel_value = 0; 807 h->retry_time.rel_value = 0;
815 process_queue (h); 808 process_queue (h);
816 if (rc.cont != NULL) 809 if (rc.cont != NULL)
@@ -845,18 +838,13 @@ process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
845 * (or rather, will already have been invoked) 838 * (or rather, will already have been invoked)
846 */ 839 */
847struct GNUNET_DATASTORE_QueueEntry * 840struct GNUNET_DATASTORE_QueueEntry *
848GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, 841GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
849 uint32_t rid, 842 const GNUNET_HashCode * key, size_t size,
850 const GNUNET_HashCode * key, 843 const void *data, enum GNUNET_BLOCK_Type type,
851 size_t size, 844 uint32_t priority, uint32_t anonymity,
852 const void *data,
853 enum GNUNET_BLOCK_Type type,
854 uint32_t priority,
855 uint32_t anonymity,
856 uint32_t replication, 845 uint32_t replication,
857 struct GNUNET_TIME_Absolute expiration, 846 struct GNUNET_TIME_Absolute expiration,
858 unsigned int queue_priority, 847 unsigned int queue_priority, unsigned int max_queue_size,
859 unsigned int max_queue_size,
860 struct GNUNET_TIME_Relative timeout, 848 struct GNUNET_TIME_Relative timeout,
861 GNUNET_DATASTORE_ContinuationWithStatus cont, 849 GNUNET_DATASTORE_ContinuationWithStatus cont,
862 void *cont_cls) 850 void *cont_cls)
@@ -869,16 +857,14 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
869#if DEBUG_DATASTORE 857#if DEBUG_DATASTORE
870 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 858 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871 "Asked to put %u bytes of data under key `%s' for %llu ms\n", 859 "Asked to put %u bytes of data under key `%s' for %llu ms\n",
872 size, 860 size, GNUNET_h2s (key),
873 GNUNET_h2s (key),
874 GNUNET_TIME_absolute_get_remaining (expiration).rel_value); 861 GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
875#endif 862#endif
876 msize = sizeof (struct DataMessage) + size; 863 msize = sizeof (struct DataMessage) + size;
877 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); 864 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
878 qc.sc.cont = cont; 865 qc.sc.cont = cont;
879 qc.sc.cont_cls = cont_cls; 866 qc.sc.cont_cls = cont_cls;
880 qe = make_queue_entry (h, msize, 867 qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
881 queue_priority, max_queue_size, timeout,
882 &process_status_message, &qc); 868 &process_status_message, &qc);
883 if (qe == NULL) 869 if (qe == NULL)
884 { 870 {
@@ -888,8 +874,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
888#endif 874#endif
889 return NULL; 875 return NULL;
890 } 876 }
891 GNUNET_STATISTICS_update (h->stats, 877 GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
892 gettext_noop ("# PUT requests executed"),
893 1, GNUNET_NO); 878 1, GNUNET_NO);
894 dm = (struct DataMessage *) &qe[1]; 879 dm = (struct DataMessage *) &qe[1];
895 dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT); 880 dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
@@ -930,10 +915,8 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
930 * (or rather, will already have been invoked) 915 * (or rather, will already have been invoked)
931 */ 916 */
932struct GNUNET_DATASTORE_QueueEntry * 917struct GNUNET_DATASTORE_QueueEntry *
933GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, 918GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
934 uint64_t amount, 919 uint32_t entries, unsigned int queue_priority,
935 uint32_t entries,
936 unsigned int queue_priority,
937 unsigned int max_queue_size, 920 unsigned int max_queue_size,
938 struct GNUNET_TIME_Relative timeout, 921 struct GNUNET_TIME_Relative timeout,
939 GNUNET_DATASTORE_ContinuationWithStatus cont, 922 GNUNET_DATASTORE_ContinuationWithStatus cont,
@@ -952,9 +935,8 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
952#endif 935#endif
953 qc.sc.cont = cont; 936 qc.sc.cont = cont;
954 qc.sc.cont_cls = cont_cls; 937 qc.sc.cont_cls = cont_cls;
955 qe = make_queue_entry (h, sizeof (struct ReserveMessage), 938 qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority,
956 queue_priority, max_queue_size, timeout, 939 max_queue_size, timeout, &process_status_message, &qc);
957 &process_status_message, &qc);
958 if (qe == NULL) 940 if (qe == NULL)
959 { 941 {
960#if DEBUG_DATASTORE 942#if DEBUG_DATASTORE
@@ -964,8 +946,8 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
964 return NULL; 946 return NULL;
965 } 947 }
966 GNUNET_STATISTICS_update (h->stats, 948 GNUNET_STATISTICS_update (h->stats,
967 gettext_noop ("# RESERVE requests executed"), 949 gettext_noop ("# RESERVE requests executed"), 1,
968 1, GNUNET_NO); 950 GNUNET_NO);
969 rm = (struct ReserveMessage *) &qe[1]; 951 rm = (struct ReserveMessage *) &qe[1];
970 rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); 952 rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
971 rm->header.size = htons (sizeof (struct ReserveMessage)); 953 rm->header.size = htons (sizeof (struct ReserveMessage));
@@ -999,8 +981,7 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
999 */ 981 */
1000struct GNUNET_DATASTORE_QueueEntry * 982struct GNUNET_DATASTORE_QueueEntry *
1001GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, 983GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1002 uint32_t rid, 984 uint32_t rid, unsigned int queue_priority,
1003 unsigned int queue_priority,
1004 unsigned int max_queue_size, 985 unsigned int max_queue_size,
1005 struct GNUNET_TIME_Relative timeout, 986 struct GNUNET_TIME_Relative timeout,
1006 GNUNET_DATASTORE_ContinuationWithStatus cont, 987 GNUNET_DATASTORE_ContinuationWithStatus cont,
@@ -1059,8 +1040,7 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1059 * (or rather, will already have been invoked) 1040 * (or rather, will already have been invoked)
1060 */ 1041 */
1061struct GNUNET_DATASTORE_QueueEntry * 1042struct GNUNET_DATASTORE_QueueEntry *
1062GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, 1043GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
1063 uint64_t uid,
1064 uint32_t priority, 1044 uint32_t priority,
1065 struct GNUNET_TIME_Absolute expiration, 1045 struct GNUNET_TIME_Absolute expiration,
1066 unsigned int queue_priority, 1046 unsigned int queue_priority,
@@ -1078,15 +1058,13 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1078#if DEBUG_DATASTORE 1058#if DEBUG_DATASTORE
1079 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1059 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1080 "Asked to update entry %llu raising priority by %u and expiration to %llu\n", 1060 "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1081 uid, 1061 uid, (unsigned int) priority,
1082 (unsigned int) priority,
1083 (unsigned long long) expiration.abs_value); 1062 (unsigned long long) expiration.abs_value);
1084#endif 1063#endif
1085 qc.sc.cont = cont; 1064 qc.sc.cont = cont;
1086 qc.sc.cont_cls = cont_cls; 1065 qc.sc.cont_cls = cont_cls;
1087 qe = make_queue_entry (h, sizeof (struct UpdateMessage), 1066 qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
1088 queue_priority, max_queue_size, timeout, 1067 max_queue_size, timeout, &process_status_message, &qc);
1089 &process_status_message, &qc);
1090 if (qe == NULL) 1068 if (qe == NULL)
1091 { 1069 {
1092#if DEBUG_DATASTORE 1070#if DEBUG_DATASTORE
@@ -1096,8 +1074,8 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1096 return NULL; 1074 return NULL;
1097 } 1075 }
1098 GNUNET_STATISTICS_update (h->stats, 1076 GNUNET_STATISTICS_update (h->stats,
1099 gettext_noop ("# UPDATE requests executed"), 1077 gettext_noop ("# UPDATE requests executed"), 1,
1100 1, GNUNET_NO); 1078 GNUNET_NO);
1101 um = (struct UpdateMessage *) &qe[1]; 1079 um = (struct UpdateMessage *) &qe[1];
1102 um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE); 1080 um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1103 um->header.size = htons (sizeof (struct UpdateMessage)); 1081 um->header.size = htons (sizeof (struct UpdateMessage));
@@ -1132,10 +1110,8 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1132 */ 1110 */
1133struct GNUNET_DATASTORE_QueueEntry * 1111struct GNUNET_DATASTORE_QueueEntry *
1134GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, 1112GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1135 const GNUNET_HashCode * key, 1113 const GNUNET_HashCode * key, size_t size,
1136 size_t size, 1114 const void *data, unsigned int queue_priority,
1137 const void *data,
1138 unsigned int queue_priority,
1139 unsigned int max_queue_size, 1115 unsigned int max_queue_size,
1140 struct GNUNET_TIME_Relative timeout, 1116 struct GNUNET_TIME_Relative timeout,
1141 GNUNET_DATASTORE_ContinuationWithStatus cont, 1117 GNUNET_DATASTORE_ContinuationWithStatus cont,
@@ -1150,15 +1126,14 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1150 cont = &drop_status_cont; 1126 cont = &drop_status_cont;
1151#if DEBUG_DATASTORE 1127#if DEBUG_DATASTORE
1152 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1128 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1153 "Asked to remove %u bytes under key `%s'\n", 1129 "Asked to remove %u bytes under key `%s'\n", size,
1154 size, GNUNET_h2s (key)); 1130 GNUNET_h2s (key));
1155#endif 1131#endif
1156 qc.sc.cont = cont; 1132 qc.sc.cont = cont;
1157 qc.sc.cont_cls = cont_cls; 1133 qc.sc.cont_cls = cont_cls;
1158 msize = sizeof (struct DataMessage) + size; 1134 msize = sizeof (struct DataMessage) + size;
1159 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); 1135 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1160 qe = make_queue_entry (h, msize, 1136 qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
1161 queue_priority, max_queue_size, timeout,
1162 &process_status_message, &qc); 1137 &process_status_message, &qc);
1163 if (qe == NULL) 1138 if (qe == NULL)
1164 { 1139 {
@@ -1169,8 +1144,8 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1169 return NULL; 1144 return NULL;
1170 } 1145 }
1171 GNUNET_STATISTICS_update (h->stats, 1146 GNUNET_STATISTICS_update (h->stats,
1172 gettext_noop ("# REMOVE requests executed"), 1147 gettext_noop ("# REMOVE requests executed"), 1,
1173 1, GNUNET_NO); 1148 GNUNET_NO);
1174 dm = (struct DataMessage *) &qe[1]; 1149 dm = (struct DataMessage *) &qe[1];
1175 dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE); 1150 dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1176 dm->header.size = htons (msize); 1151 dm->header.size = htons (msize);
@@ -1222,8 +1197,8 @@ process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1222 process_queue (h); 1197 process_queue (h);
1223 } 1198 }
1224 if (rc.proc != NULL) 1199 if (rc.proc != NULL)
1225 rc.proc (rc.proc_cls, 1200 rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1226 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 1201 0);
1227 return; 1202 return;
1228 } 1203 }
1229 if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 1204 if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
@@ -1239,8 +1214,8 @@ process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1239 h->queue_size); 1214 h->queue_size);
1240#endif 1215#endif
1241 if (rc.proc != NULL) 1216 if (rc.proc != NULL)
1242 rc.proc (rc.proc_cls, 1217 rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1243 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 1218 0);
1244 h->retry_time.rel_value = 0; 1219 h->retry_time.rel_value = 0;
1245 h->result_count = 0; 1220 h->result_count = 0;
1246 process_queue (h); 1221 process_queue (h);
@@ -1256,8 +1231,8 @@ process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1256 h->retry_time = GNUNET_TIME_UNIT_ZERO; 1231 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1257 do_disconnect (h); 1232 do_disconnect (h);
1258 if (rc.proc != NULL) 1233 if (rc.proc != NULL)
1259 rc.proc (rc.proc_cls, 1234 rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1260 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 1235 0);
1261 return; 1236 return;
1262 } 1237 }
1263 if ((ntohs (msg->size) < sizeof (struct DataMessage)) || 1238 if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
@@ -1271,30 +1246,25 @@ process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1271 h->retry_time = GNUNET_TIME_UNIT_ZERO; 1246 h->retry_time = GNUNET_TIME_UNIT_ZERO;
1272 do_disconnect (h); 1247 do_disconnect (h);
1273 if (rc.proc != NULL) 1248 if (rc.proc != NULL)
1274 rc.proc (rc.proc_cls, 1249 rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1275 NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); 1250 0);
1276 return; 1251 return;
1277 } 1252 }
1278 GNUNET_STATISTICS_update (h->stats, 1253 GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1279 gettext_noop ("# Results received"), 1, GNUNET_NO); 1254 GNUNET_NO);
1280 dm = (const struct DataMessage *) msg; 1255 dm = (const struct DataMessage *) msg;
1281#if DEBUG_DATASTORE 1256#if DEBUG_DATASTORE
1282 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1257 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1283 "Received result %llu with type %u and size %u with key %s\n", 1258 "Received result %llu with type %u and size %u with key %s\n",
1284 (unsigned long long) GNUNET_ntohll (dm->uid), 1259 (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1285 ntohl (dm->type), ntohl (dm->size), GNUNET_h2s (&dm->key)); 1260 ntohl (dm->size), GNUNET_h2s (&dm->key));
1286#endif 1261#endif
1287 free_queue_entry (qe); 1262 free_queue_entry (qe);
1288 h->retry_time.rel_value = 0; 1263 h->retry_time.rel_value = 0;
1289 process_queue (h); 1264 process_queue (h);
1290 if (rc.proc != NULL) 1265 if (rc.proc != NULL)
1291 rc.proc (rc.proc_cls, 1266 rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1292 &dm->key, 1267 ntohl (dm->priority), ntohl (dm->anonymity),
1293 ntohl (dm->size),
1294 &dm[1],
1295 ntohl (dm->type),
1296 ntohl (dm->priority),
1297 ntohl (dm->anonymity),
1298 GNUNET_TIME_absolute_ntoh (dm->expiration), 1268 GNUNET_TIME_absolute_ntoh (dm->expiration),
1299 GNUNET_ntohll (dm->uid)); 1269 GNUNET_ntohll (dm->uid));
1300} 1270}
@@ -1401,8 +1371,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1401#if DEBUG_DATASTORE 1371#if DEBUG_DATASTORE
1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1403 "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n", 1373 "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1404 (unsigned long long) offset, 1374 (unsigned long long) offset, type,
1405 type, (unsigned long long) timeout.rel_value); 1375 (unsigned long long) timeout.rel_value);
1406#endif 1376#endif
1407 qc.rc.proc = proc; 1377 qc.rc.proc = proc;
1408 qc.rc.proc_cls = proc_cls; 1378 qc.rc.proc_cls = proc_cls;
@@ -1453,8 +1423,7 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1453 * cancel 1423 * cancel
1454 */ 1424 */
1455struct GNUNET_DATASTORE_QueueEntry * 1425struct GNUNET_DATASTORE_QueueEntry *
1456GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, 1426GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset,
1457 uint64_t offset,
1458 const GNUNET_HashCode * key, 1427 const GNUNET_HashCode * key,
1459 enum GNUNET_BLOCK_Type type, 1428 enum GNUNET_BLOCK_Type type,
1460 unsigned int queue_priority, 1429 unsigned int queue_priority,
@@ -1474,19 +1443,17 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1474#endif 1443#endif
1475 qc.rc.proc = proc; 1444 qc.rc.proc = proc;
1476 qc.rc.proc_cls = proc_cls; 1445 qc.rc.proc_cls = proc_cls;
1477 qe = make_queue_entry (h, sizeof (struct GetMessage), 1446 qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority,
1478 queue_priority, max_queue_size, timeout, 1447 max_queue_size, timeout, &process_result_message, &qc);
1479 &process_result_message, &qc);
1480 if (qe == NULL) 1448 if (qe == NULL)
1481 { 1449 {
1482#if DEBUG_DATASTORE 1450#if DEBUG_DATASTORE
1483 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1451 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
1484 "Could not queue request for `%s'\n", GNUNET_h2s (key)); 1452 GNUNET_h2s (key));
1485#endif 1453#endif
1486 return NULL; 1454 return NULL;
1487 } 1455 }
1488 GNUNET_STATISTICS_update (h->stats, 1456 GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"),
1489 gettext_noop ("# GET requests executed"),
1490 1, GNUNET_NO); 1457 1, GNUNET_NO);
1491 gm = (struct GetMessage *) &qe[1]; 1458 gm = (struct GetMessage *) &qe[1];
1492 gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET); 1459 gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
@@ -1522,8 +1489,8 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1522 h = qe->h; 1489 h = qe->h;
1523#if DEBUG_DATASTORE 1490#if DEBUG_DATASTORE
1524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1525 "Pending DATASTORE request %p cancelled (%d, %d)\n", 1492 "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
1526 qe, qe->was_transmitted, h->queue_head == qe); 1493 qe->was_transmitted, h->queue_head == qe);
1527#endif 1494#endif
1528 if (GNUNET_YES == qe->was_transmitted) 1495 if (GNUNET_YES == qe->was_transmitted)
1529 { 1496 {