aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2012-11-24 23:45:27 +0000
committerChristian Grothoff <christian@grothoff.org>2012-11-24 23:45:27 +0000
commitc345383f56aa7d7da19fcd129ab0974c16ed92bc (patch)
tree7de83a21b8562653602430736b7ecf3cdb09eba0
parent5a9231ed9630c600cc2da70354692d74fe66329d (diff)
downloadgnunet-c345383f56aa7d7da19fcd129ab0974c16ed92bc.tar.gz
gnunet-c345383f56aa7d7da19fcd129ab0974c16ed92bc.zip
actually using stream for dblock/iblock transfer if possible
-rw-r--r--src/fs/gnunet-service-fs.c24
-rw-r--r--src/fs/gnunet-service-fs_pr.c91
-rw-r--r--src/fs/gnunet-service-fs_pr.h12
-rw-r--r--src/fs/gnunet-service-fs_put.c7
4 files changed, 125 insertions, 9 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index 37f020806..b48531d16 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -22,9 +22,6 @@
22 * @file fs/gnunet-service-fs.c 22 * @file fs/gnunet-service-fs.c
23 * @brief gnunet anonymity protocol implementation 23 * @brief gnunet anonymity protocol implementation
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 *
26 * To use:
27 * - consider re-issue GSF_dht_lookup_ after non-DHT reply received
28 */ 25 */
29#include "platform.h" 26#include "platform.h"
30#include <float.h> 27#include <float.h>
@@ -397,7 +394,26 @@ start_p2p_processing (void *cls, struct GSF_PendingRequest *pr,
397 GSF_pending_request_cancel_ (pr, GNUNET_YES); 394 GSF_pending_request_cancel_ (pr, GNUNET_YES);
398 return; 395 return;
399 } 396 }
400 GSF_dht_lookup_ (pr); 397 if (0 == prd->anonymity_level)
398 {
399 switch (prd->type)
400 {
401 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
402 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
403 /* the above block types MAY be available via 'stream' */
404 GSF_stream_lookup_ (pr);
405 break;
406 case GNUNET_BLOCK_TYPE_FS_KBLOCK:
407 case GNUNET_BLOCK_TYPE_FS_SBLOCK:
408 case GNUNET_BLOCK_TYPE_FS_NBLOCK:
409 /* the above block types are in the DHT */
410 GSF_dht_lookup_ (pr);
411 break;
412 default:
413 GNUNET_break (0);
414 break;
415 }
416 }
401 consider_forwarding (NULL, pr, result); 417 consider_forwarding (NULL, pr, result);
402} 418}
403 419
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index f89abdb72..6acc0e2bf 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -30,6 +30,13 @@
30#include "gnunet-service-fs_indexing.h" 30#include "gnunet-service-fs_indexing.h"
31#include "gnunet-service-fs_pe.h" 31#include "gnunet-service-fs_pe.h"
32#include "gnunet-service-fs_pr.h" 32#include "gnunet-service-fs_pr.h"
33#include "gnunet-service-fs_stream.h"
34
35
36/**
37 * Desired replication level for GETs.
38 */
39#define DHT_GET_REPLICATION 5
33 40
34/** 41/**
35 * Maximum size of the datastore queue for P2P operations. Needs to 42 * Maximum size of the datastore queue for P2P operations. Needs to
@@ -102,6 +109,11 @@ struct GSF_PendingRequest
102 struct GNUNET_DHT_GetHandle *gh; 109 struct GNUNET_DHT_GetHandle *gh;
103 110
104 /** 111 /**
112 * Stream request handle for this request (or NULL for none).
113 */
114 struct GSF_StreamRequest *stream_request;
115
116 /**
105 * Function to call upon completion of the local get 117 * Function to call upon completion of the local get
106 * request, or NULL for none. 118 * request, or NULL for none.
107 */ 119 */
@@ -624,6 +636,11 @@ clean_request (void *cls, const struct GNUNET_HashCode * key, void *value)
624 GNUNET_DHT_get_stop (pr->gh); 636 GNUNET_DHT_get_stop (pr->gh);
625 pr->gh = NULL; 637 pr->gh = NULL;
626 } 638 }
639 if (NULL != pr->stream_request)
640 {
641 GSF_stream_query_cancel (pr->stream_request);
642 pr->stream_request = NULL;
643 }
627 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) 644 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
628 { 645 {
629 GNUNET_SCHEDULER_cancel (pr->warn_task); 646 GNUNET_SCHEDULER_cancel (pr->warn_task);
@@ -676,6 +693,11 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup)
676 GNUNET_DHT_get_stop (pr->gh); 693 GNUNET_DHT_get_stop (pr->gh);
677 pr->gh = NULL; 694 pr->gh = NULL;
678 } 695 }
696 if (NULL != pr->stream_request)
697 {
698 GSF_stream_query_cancel (pr->stream_request);
699 pr->stream_request = NULL;
700 }
679 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) 701 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
680 { 702 {
681 GNUNET_SCHEDULER_cancel (pr->warn_task); 703 GNUNET_SCHEDULER_cancel (pr->warn_task);
@@ -1121,7 +1143,7 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1121 pr->gh = 1143 pr->gh =
1122 GNUNET_DHT_get_start (GSF_dht, 1144 GNUNET_DHT_get_start (GSF_dht,
1123 pr->public_data.type, &pr->public_data.query, 1145 pr->public_data.type, &pr->public_data.query,
1124 5 /* DEFAULT_GET_REPLICATION */ , 1146 DHT_GET_REPLICATION,
1125 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, 1147 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
1126 xquery, xquery_size, &handle_dht_reply, pr); 1148 xquery, xquery_size, &handle_dht_reply, pr);
1127 if ( (NULL != pr->gh) && 1149 if ( (NULL != pr->gh) &&
@@ -1133,6 +1155,72 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1133 1155
1134 1156
1135/** 1157/**
1158 * Function called with a reply from the stream.
1159 *
1160 * @param cls the pending request struct
1161 * @param type type of the block, ANY on error
1162 * @param expiration expiration time for the block
1163 * @param data_size number of bytes in 'data', 0 on error
1164 * @param data reply block data, NULL on error
1165 */
1166static void
1167stream_reply_proc (void *cls,
1168 enum GNUNET_BLOCK_Type type,
1169 struct GNUNET_TIME_Absolute expiration,
1170 size_t data_size,
1171 const void *data)
1172{
1173 struct GSF_PendingRequest *pr = cls;
1174 struct ProcessReplyClosure prq;
1175 struct GNUNET_HashCode query;
1176
1177 pr->stream_request = NULL;
1178 if (GNUNET_YES !=
1179 GNUNET_BLOCK_get_key (GSF_block_ctx,
1180 type,
1181 data, data_size, &query))
1182 {
1183 GNUNET_break_op (0);
1184 return;
1185 }
1186 GNUNET_STATISTICS_update (GSF_stats,
1187 gettext_noop ("# Replies received from STREAM"), 1,
1188 GNUNET_NO);
1189 memset (&prq, 0, sizeof (prq));
1190 prq.data = data;
1191 prq.expiration = expiration;
1192 /* do not allow migrated content to live longer than 1 year */
1193 prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
1194 prq.expiration);
1195 prq.size = data_size;
1196 prq.type = type;
1197 process_reply (&prq, &query, pr);
1198}
1199
1200
1201/**
1202 * Consider downloading via stream (if possible)
1203 *
1204 * @param pr the pending request to process
1205 */
1206void
1207GSF_stream_lookup_ (struct GSF_PendingRequest *pr)
1208{
1209 if (0 != pr->public_data.anonymity_level)
1210 return;
1211 if (0 == pr->public_data.target)
1212 return;
1213 if (NULL != pr->stream_request)
1214 return;
1215 pr->stream_request = GSF_stream_query (pr->public_data.target,
1216 &pr->public_data.query,
1217 pr->public_data.type,
1218 &stream_reply_proc,
1219 pr);
1220}
1221
1222
1223/**
1136 * Task that issues a warning if the datastore lookup takes too long. 1224 * Task that issues a warning if the datastore lookup takes too long.
1137 * 1225 *
1138 * @param cls the 'struct GSF_PendingRequest' 1226 * @param cls the 'struct GSF_PendingRequest'
@@ -1456,6 +1544,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1456 GSF_LocalLookupContinuation cont, void *cont_cls) 1544 GSF_LocalLookupContinuation cont, void *cont_cls)
1457{ 1545{
1458 GNUNET_assert (NULL == pr->gh); 1546 GNUNET_assert (NULL == pr->gh);
1547 GNUNET_assert (NULL == pr->stream_request);
1459 GNUNET_assert (NULL == pr->llc_cont); 1548 GNUNET_assert (NULL == pr->llc_cont);
1460 pr->llc_cont = cont; 1549 pr->llc_cont = cont;
1461 pr->llc_cont_cls = cont_cls; 1550 pr->llc_cont_cls = cont_cls;
diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h
index ab5ce0fab..371aa660b 100644
--- a/src/fs/gnunet-service-fs_pr.h
+++ b/src/fs/gnunet-service-fs_pr.h
@@ -93,8 +93,7 @@ struct GSF_PendingRequestData
93 const struct GNUNET_HashCode *namespace; 93 const struct GNUNET_HashCode *namespace;
94 94
95 /** 95 /**
96 * Identity of a peer hosting the content, only set if 96 * Identity of a peer hosting the content, otherwise NULl.
97 * 'has_target' is GNUNET_YES.
98 * Allocated after struct only if needed. Do not free! 97 * Allocated after struct only if needed. Do not free!
99 */ 98 */
100 const struct GNUNET_PeerIdentity *target; 99 const struct GNUNET_PeerIdentity *target;
@@ -360,6 +359,15 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr);
360 359
361 360
362/** 361/**
362 * Consider downloading via stream (if possible)
363 *
364 * @param pr the pending request to process
365 */
366void
367GSF_stream_lookup_ (struct GSF_PendingRequest *pr);
368
369
370/**
363 * Function to be called after we're done processing 371 * Function to be called after we're done processing
364 * replies from the local lookup. 372 * replies from the local lookup.
365 * 373 *
diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c
index e4b13edab..49962d314 100644
--- a/src/fs/gnunet-service-fs_put.c
+++ b/src/fs/gnunet-service-fs_put.c
@@ -180,8 +180,11 @@ delay_dht_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
180 * maybe 0 if no unique identifier is available 180 * maybe 0 if no unique identifier is available
181 */ 181 */
182static void 182static void
183process_dht_put_content (void *cls, const struct GNUNET_HashCode * key, size_t size, 183process_dht_put_content (void *cls,
184 const void *data, enum GNUNET_BLOCK_Type type, 184 const struct GNUNET_HashCode * key,
185 size_t size,
186 const void *data,
187 enum GNUNET_BLOCK_Type type,
185 uint32_t priority, uint32_t anonymity, 188 uint32_t priority, uint32_t anonymity,
186 struct GNUNET_TIME_Absolute expiration, uint64_t uid) 189 struct GNUNET_TIME_Absolute expiration, uint64_t uid)
187{ 190{