diff options
author | Christian Grothoff <christian@grothoff.org> | 2012-11-24 23:45:27 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2012-11-24 23:45:27 +0000 |
commit | c345383f56aa7d7da19fcd129ab0974c16ed92bc (patch) | |
tree | 7de83a21b8562653602430736b7ecf3cdb09eba0 /src/fs/gnunet-service-fs_pr.c | |
parent | 5a9231ed9630c600cc2da70354692d74fe66329d (diff) | |
download | gnunet-c345383f56aa7d7da19fcd129ab0974c16ed92bc.tar.gz gnunet-c345383f56aa7d7da19fcd129ab0974c16ed92bc.zip |
actually using stream for dblock/iblock transfer if possible
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r-- | src/fs/gnunet-service-fs_pr.c | 91 |
1 files changed, 90 insertions, 1 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index f89abdb72..6acc0e2bf 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c | |||
@@ -30,6 +30,13 @@ | |||
30 | #include "gnunet-service-fs_indexing.h" | 30 | #include "gnunet-service-fs_indexing.h" |
31 | #include "gnunet-service-fs_pe.h" | 31 | #include "gnunet-service-fs_pe.h" |
32 | #include "gnunet-service-fs_pr.h" | 32 | #include "gnunet-service-fs_pr.h" |
33 | #include "gnunet-service-fs_stream.h" | ||
34 | |||
35 | |||
36 | /** | ||
37 | * Desired replication level for GETs. | ||
38 | */ | ||
39 | #define DHT_GET_REPLICATION 5 | ||
33 | 40 | ||
34 | /** | 41 | /** |
35 | * Maximum size of the datastore queue for P2P operations. Needs to | 42 | * Maximum size of the datastore queue for P2P operations. Needs to |
@@ -102,6 +109,11 @@ struct GSF_PendingRequest | |||
102 | struct GNUNET_DHT_GetHandle *gh; | 109 | struct GNUNET_DHT_GetHandle *gh; |
103 | 110 | ||
104 | /** | 111 | /** |
112 | * Stream request handle for this request (or NULL for none). | ||
113 | */ | ||
114 | struct GSF_StreamRequest *stream_request; | ||
115 | |||
116 | /** | ||
105 | * Function to call upon completion of the local get | 117 | * Function to call upon completion of the local get |
106 | * request, or NULL for none. | 118 | * request, or NULL for none. |
107 | */ | 119 | */ |
@@ -624,6 +636,11 @@ clean_request (void *cls, const struct GNUNET_HashCode * key, void *value) | |||
624 | GNUNET_DHT_get_stop (pr->gh); | 636 | GNUNET_DHT_get_stop (pr->gh); |
625 | pr->gh = NULL; | 637 | pr->gh = NULL; |
626 | } | 638 | } |
639 | if (NULL != pr->stream_request) | ||
640 | { | ||
641 | GSF_stream_query_cancel (pr->stream_request); | ||
642 | pr->stream_request = NULL; | ||
643 | } | ||
627 | if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) | 644 | if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) |
628 | { | 645 | { |
629 | GNUNET_SCHEDULER_cancel (pr->warn_task); | 646 | GNUNET_SCHEDULER_cancel (pr->warn_task); |
@@ -676,6 +693,11 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup) | |||
676 | GNUNET_DHT_get_stop (pr->gh); | 693 | GNUNET_DHT_get_stop (pr->gh); |
677 | pr->gh = NULL; | 694 | pr->gh = NULL; |
678 | } | 695 | } |
696 | if (NULL != pr->stream_request) | ||
697 | { | ||
698 | GSF_stream_query_cancel (pr->stream_request); | ||
699 | pr->stream_request = NULL; | ||
700 | } | ||
679 | if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) | 701 | if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) |
680 | { | 702 | { |
681 | GNUNET_SCHEDULER_cancel (pr->warn_task); | 703 | GNUNET_SCHEDULER_cancel (pr->warn_task); |
@@ -1121,7 +1143,7 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr) | |||
1121 | pr->gh = | 1143 | pr->gh = |
1122 | GNUNET_DHT_get_start (GSF_dht, | 1144 | GNUNET_DHT_get_start (GSF_dht, |
1123 | pr->public_data.type, &pr->public_data.query, | 1145 | pr->public_data.type, &pr->public_data.query, |
1124 | 5 /* DEFAULT_GET_REPLICATION */ , | 1146 | DHT_GET_REPLICATION, |
1125 | GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, | 1147 | GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, |
1126 | xquery, xquery_size, &handle_dht_reply, pr); | 1148 | xquery, xquery_size, &handle_dht_reply, pr); |
1127 | if ( (NULL != pr->gh) && | 1149 | if ( (NULL != pr->gh) && |
@@ -1133,6 +1155,72 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr) | |||
1133 | 1155 | ||
1134 | 1156 | ||
1135 | /** | 1157 | /** |
1158 | * Function called with a reply from the stream. | ||
1159 | * | ||
1160 | * @param cls the pending request struct | ||
1161 | * @param type type of the block, ANY on error | ||
1162 | * @param expiration expiration time for the block | ||
1163 | * @param data_size number of bytes in 'data', 0 on error | ||
1164 | * @param data reply block data, NULL on error | ||
1165 | */ | ||
1166 | static void | ||
1167 | stream_reply_proc (void *cls, | ||
1168 | enum GNUNET_BLOCK_Type type, | ||
1169 | struct GNUNET_TIME_Absolute expiration, | ||
1170 | size_t data_size, | ||
1171 | const void *data) | ||
1172 | { | ||
1173 | struct GSF_PendingRequest *pr = cls; | ||
1174 | struct ProcessReplyClosure prq; | ||
1175 | struct GNUNET_HashCode query; | ||
1176 | |||
1177 | pr->stream_request = NULL; | ||
1178 | if (GNUNET_YES != | ||
1179 | GNUNET_BLOCK_get_key (GSF_block_ctx, | ||
1180 | type, | ||
1181 | data, data_size, &query)) | ||
1182 | { | ||
1183 | GNUNET_break_op (0); | ||
1184 | return; | ||
1185 | } | ||
1186 | GNUNET_STATISTICS_update (GSF_stats, | ||
1187 | gettext_noop ("# Replies received from STREAM"), 1, | ||
1188 | GNUNET_NO); | ||
1189 | memset (&prq, 0, sizeof (prq)); | ||
1190 | prq.data = data; | ||
1191 | prq.expiration = expiration; | ||
1192 | /* do not allow migrated content to live longer than 1 year */ | ||
1193 | prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), | ||
1194 | prq.expiration); | ||
1195 | prq.size = data_size; | ||
1196 | prq.type = type; | ||
1197 | process_reply (&prq, &query, pr); | ||
1198 | } | ||
1199 | |||
1200 | |||
1201 | /** | ||
1202 | * Consider downloading via stream (if possible) | ||
1203 | * | ||
1204 | * @param pr the pending request to process | ||
1205 | */ | ||
1206 | void | ||
1207 | GSF_stream_lookup_ (struct GSF_PendingRequest *pr) | ||
1208 | { | ||
1209 | if (0 != pr->public_data.anonymity_level) | ||
1210 | return; | ||
1211 | if (0 == pr->public_data.target) | ||
1212 | return; | ||
1213 | if (NULL != pr->stream_request) | ||
1214 | return; | ||
1215 | pr->stream_request = GSF_stream_query (pr->public_data.target, | ||
1216 | &pr->public_data.query, | ||
1217 | pr->public_data.type, | ||
1218 | &stream_reply_proc, | ||
1219 | pr); | ||
1220 | } | ||
1221 | |||
1222 | |||
1223 | /** | ||
1136 | * Task that issues a warning if the datastore lookup takes too long. | 1224 | * Task that issues a warning if the datastore lookup takes too long. |
1137 | * | 1225 | * |
1138 | * @param cls the 'struct GSF_PendingRequest' | 1226 | * @param cls the 'struct GSF_PendingRequest' |
@@ -1456,6 +1544,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, | |||
1456 | GSF_LocalLookupContinuation cont, void *cont_cls) | 1544 | GSF_LocalLookupContinuation cont, void *cont_cls) |
1457 | { | 1545 | { |
1458 | GNUNET_assert (NULL == pr->gh); | 1546 | GNUNET_assert (NULL == pr->gh); |
1547 | GNUNET_assert (NULL == pr->stream_request); | ||
1459 | GNUNET_assert (NULL == pr->llc_cont); | 1548 | GNUNET_assert (NULL == pr->llc_cont); |
1460 | pr->llc_cont = cont; | 1549 | pr->llc_cont = cont; |
1461 | pr->llc_cont_cls = cont_cls; | 1550 | pr->llc_cont_cls = cont_cls; |