aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2012-11-24 23:45:27 +0000
committerChristian Grothoff <christian@grothoff.org>2012-11-24 23:45:27 +0000
commitc345383f56aa7d7da19fcd129ab0974c16ed92bc (patch)
tree7de83a21b8562653602430736b7ecf3cdb09eba0 /src/fs/gnunet-service-fs_pr.c
parent5a9231ed9630c600cc2da70354692d74fe66329d (diff)
downloadgnunet-c345383f56aa7d7da19fcd129ab0974c16ed92bc.tar.gz
gnunet-c345383f56aa7d7da19fcd129ab0974c16ed92bc.zip
actually using stream for dblock/iblock transfer if possible
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c91
1 files changed, 90 insertions, 1 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index f89abdb72..6acc0e2bf 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -30,6 +30,13 @@
30#include "gnunet-service-fs_indexing.h" 30#include "gnunet-service-fs_indexing.h"
31#include "gnunet-service-fs_pe.h" 31#include "gnunet-service-fs_pe.h"
32#include "gnunet-service-fs_pr.h" 32#include "gnunet-service-fs_pr.h"
33#include "gnunet-service-fs_stream.h"
34
35
36/**
37 * Desired replication level for GETs.
38 */
39#define DHT_GET_REPLICATION 5
33 40
34/** 41/**
35 * Maximum size of the datastore queue for P2P operations. Needs to 42 * Maximum size of the datastore queue for P2P operations. Needs to
@@ -102,6 +109,11 @@ struct GSF_PendingRequest
102 struct GNUNET_DHT_GetHandle *gh; 109 struct GNUNET_DHT_GetHandle *gh;
103 110
104 /** 111 /**
112 * Stream request handle for this request (or NULL for none).
113 */
114 struct GSF_StreamRequest *stream_request;
115
116 /**
105 * Function to call upon completion of the local get 117 * Function to call upon completion of the local get
106 * request, or NULL for none. 118 * request, or NULL for none.
107 */ 119 */
@@ -624,6 +636,11 @@ clean_request (void *cls, const struct GNUNET_HashCode * key, void *value)
624 GNUNET_DHT_get_stop (pr->gh); 636 GNUNET_DHT_get_stop (pr->gh);
625 pr->gh = NULL; 637 pr->gh = NULL;
626 } 638 }
639 if (NULL != pr->stream_request)
640 {
641 GSF_stream_query_cancel (pr->stream_request);
642 pr->stream_request = NULL;
643 }
627 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) 644 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
628 { 645 {
629 GNUNET_SCHEDULER_cancel (pr->warn_task); 646 GNUNET_SCHEDULER_cancel (pr->warn_task);
@@ -676,6 +693,11 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup)
676 GNUNET_DHT_get_stop (pr->gh); 693 GNUNET_DHT_get_stop (pr->gh);
677 pr->gh = NULL; 694 pr->gh = NULL;
678 } 695 }
696 if (NULL != pr->stream_request)
697 {
698 GSF_stream_query_cancel (pr->stream_request);
699 pr->stream_request = NULL;
700 }
679 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) 701 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
680 { 702 {
681 GNUNET_SCHEDULER_cancel (pr->warn_task); 703 GNUNET_SCHEDULER_cancel (pr->warn_task);
@@ -1121,7 +1143,7 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1121 pr->gh = 1143 pr->gh =
1122 GNUNET_DHT_get_start (GSF_dht, 1144 GNUNET_DHT_get_start (GSF_dht,
1123 pr->public_data.type, &pr->public_data.query, 1145 pr->public_data.type, &pr->public_data.query,
1124 5 /* DEFAULT_GET_REPLICATION */ , 1146 DHT_GET_REPLICATION,
1125 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, 1147 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
1126 xquery, xquery_size, &handle_dht_reply, pr); 1148 xquery, xquery_size, &handle_dht_reply, pr);
1127 if ( (NULL != pr->gh) && 1149 if ( (NULL != pr->gh) &&
@@ -1133,6 +1155,72 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1133 1155
1134 1156
1135/** 1157/**
1158 * Function called with a reply from the stream.
1159 *
1160 * @param cls the pending request struct
1161 * @param type type of the block, ANY on error
1162 * @param expiration expiration time for the block
1163 * @param data_size number of bytes in 'data', 0 on error
1164 * @param data reply block data, NULL on error
1165 */
1166static void
1167stream_reply_proc (void *cls,
1168 enum GNUNET_BLOCK_Type type,
1169 struct GNUNET_TIME_Absolute expiration,
1170 size_t data_size,
1171 const void *data)
1172{
1173 struct GSF_PendingRequest *pr = cls;
1174 struct ProcessReplyClosure prq;
1175 struct GNUNET_HashCode query;
1176
1177 pr->stream_request = NULL;
1178 if (GNUNET_YES !=
1179 GNUNET_BLOCK_get_key (GSF_block_ctx,
1180 type,
1181 data, data_size, &query))
1182 {
1183 GNUNET_break_op (0);
1184 return;
1185 }
1186 GNUNET_STATISTICS_update (GSF_stats,
1187 gettext_noop ("# Replies received from STREAM"), 1,
1188 GNUNET_NO);
1189 memset (&prq, 0, sizeof (prq));
1190 prq.data = data;
1191 prq.expiration = expiration;
1192 /* do not allow migrated content to live longer than 1 year */
1193 prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
1194 prq.expiration);
1195 prq.size = data_size;
1196 prq.type = type;
1197 process_reply (&prq, &query, pr);
1198}
1199
1200
1201/**
1202 * Consider downloading via stream (if possible)
1203 *
1204 * @param pr the pending request to process
1205 */
1206void
1207GSF_stream_lookup_ (struct GSF_PendingRequest *pr)
1208{
1209 if (0 != pr->public_data.anonymity_level)
1210 return;
1211 if (0 == pr->public_data.target)
1212 return;
1213 if (NULL != pr->stream_request)
1214 return;
1215 pr->stream_request = GSF_stream_query (pr->public_data.target,
1216 &pr->public_data.query,
1217 pr->public_data.type,
1218 &stream_reply_proc,
1219 pr);
1220}
1221
1222
1223/**
1136 * Task that issues a warning if the datastore lookup takes too long. 1224 * Task that issues a warning if the datastore lookup takes too long.
1137 * 1225 *
1138 * @param cls the 'struct GSF_PendingRequest' 1226 * @param cls the 'struct GSF_PendingRequest'
@@ -1456,6 +1544,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1456 GSF_LocalLookupContinuation cont, void *cont_cls) 1544 GSF_LocalLookupContinuation cont, void *cont_cls)
1457{ 1545{
1458 GNUNET_assert (NULL == pr->gh); 1546 GNUNET_assert (NULL == pr->gh);
1547 GNUNET_assert (NULL == pr->stream_request);
1459 GNUNET_assert (NULL == pr->llc_cont); 1548 GNUNET_assert (NULL == pr->llc_cont);
1460 pr->llc_cont = cont; 1549 pr->llc_cont = cont;
1461 pr->llc_cont_cls = cont_cls; 1550 pr->llc_cont_cls = cont_cls;