aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-03-10 09:36:50 +0000
committerChristian Grothoff <christian@grothoff.org>2011-03-10 09:36:50 +0000
commitfaecd2a496b5d356509b0b6b0157db34e8b3188e (patch)
treee5d54c0cb196a05e4a54031f8498e5ffe82bc3a9 /src/fs
parent073c4a9ae448041fdc9a0683fed49d55ae61803e (diff)
downloadgnunet-faecd2a496b5d356509b0b6b0157db34e8b3188e.tar.gz
gnunet-faecd2a496b5d356509b0b6b0157db34e8b3188e.zip
stuff
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/gnunet-service-fs_new.c106
-rw-r--r--src/fs/gnunet-service-fs_pr.c277
-rw-r--r--src/fs/gnunet-service-fs_pr.h49
3 files changed, 365 insertions, 67 deletions
diff --git a/src/fs/gnunet-service-fs_new.c b/src/fs/gnunet-service-fs_new.c
index f0a1513c3..4b22a0c52 100644
--- a/src/fs/gnunet-service-fs_new.c
+++ b/src/fs/gnunet-service-fs_new.c
@@ -27,7 +27,7 @@
27 * - GSF_plan_get_ (!) 27 * - GSF_plan_get_ (!)
28 * - GSF_plan_size_ (?) 28 * - GSF_plan_size_ (?)
29 * - GSF_plan_notify_request_done (!) 29 * - GSF_plan_notify_request_done (!)
30 * - 30 * - consider re-issue GSF_dht_lookup_ after non-DHT reply received
31 * 31 *
32 * 32 *
33 */ 33 */
@@ -224,6 +224,49 @@ plan (struct GSF_ConnectedPeer *cp,
224 224
225 225
226/** 226/**
227 * We have a new request, consider forwarding it to the given
228 * peer.
229 *
230 * @param cls the 'struct GSF_PendingRequest'
231 * @param peer identity of the peer
232 * @param cp handle to the connected peer record
233 * @param perf peer performance data
234 */
235static void
236consider_request_for_forwarding (void *cls,
237 const struct GNUNET_PeerIdentity *peer,
238 struct GSF_ConnectedPeer *cp,
239 const struct GSF_PeerPerformanceData *ppd)
240{
241 struct GSF_PendingRequest *pr = cls;
242
243 plan (cp, pr);
244}
245
246
247/**
248 * Function to be called after we're done processing
249 * replies from the local lookup. If the result status
250 * code indicates that there may be more replies, plan
251 * forwarding the request.
252 *
253 * @param cls closure (NULL)
254 * @param pr the pending request we were processing
255 * @param result final datastore lookup result
256 */
257static void
258consider_forwarding (void *cls,
259 struct GSF_PendingRequest *pr,
260 enum GNUNET_BLOCK_EvaluationResult result)
261{
262 if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
263 return; /* we're done... */
264 GSF_iterate_connected_peers_ (&consider_request_for_forwarding,
265 pr);
266}
267
268
269/**
227 * Handle P2P "GET" request. 270 * Handle P2P "GET" request.
228 * 271 *
229 * @param cls closure, always NULL 272 * @param cls closure, always NULL
@@ -244,31 +287,37 @@ handle_p2p_get (void *cls,
244 287
245 pr = GSF_handle_p2p_query_ (other, message); 288 pr = GSF_handle_p2p_query_ (other, message);
246 if (NULL == pr) 289 if (NULL == pr)
247 return GNUNET_SYSERR; 290 return GNUNET_SYSERR;
248 /* FIXME: local lookup! */ 291 GSF_local_lookup_ (pr,
249 /* FIXME: after local lookup, trigger forwarding/routing! */ 292 &consider_forwarding,
293 NULL);
250 return GNUNET_OK; 294 return GNUNET_OK;
251} 295}
252 296
253 297
254/** 298/**
255 * We have a new request, consider forwarding it to the given 299 * We're done with the local lookup, now consider
256 * peer. 300 * P2P processing (depending on request options and
301 * result status). Also signal that we can now
302 * receive more request information from the client.
257 * 303 *
258 * @param cls the 'struct GSF_PendingRequest' 304 * @param cls the client doing the request ('struct GNUNET_SERVER_Client')
259 * @param peer identity of the peer 305 * @param pr the pending request we were processing
260 * @param cp handle to the connected peer record 306 * @param result final datastore lookup result
261 * @param perf peer performance data
262 */ 307 */
263static void 308static void
264consider_request_for_forwarding (void *cls, 309start_p2p_processing (void *cls,
265 const struct GNUNET_PeerIdentity *peer, 310 struct GSF_PendingRequest *pr,
266 struct GSF_ConnectedPeer *cp, 311 enum GNUNET_BLOCK_EvaluationResult result)
267 const struct GSF_PeerPerformanceData *ppd)
268{ 312{
269 struct GSF_PendingRequest *pr = cls; 313 struct GNUNET_SERVER_Client *client = cls;
270 314
271 plan (cp, pr); 315 GNUNET_SERVER_receive_done (client,
316 GNUNET_OK);
317 if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
318 return; /* we're done... */
319 GSF_dht_lookup_ (pr);
320 consider_forwarding (NULL, pr, result);
272} 321}
273 322
274 323
@@ -292,28 +341,9 @@ handle_start_search (void *cls,
292 /* 'GNUNET_SERVER_receive_done was already called! */ 341 /* 'GNUNET_SERVER_receive_done was already called! */
293 return; 342 return;
294 } 343 }
295 /* FIXME: local lookup, then (after DB done!) receive_done: */ 344 GSF_local_lookup_ (pr,
296 GNUNET_SERVER_receive_done (client, 345 &start_p2p_processing,
297 GNUNET_OK); 346 client);
298#if 0
299 /* FIXME: also do DHT lookup */
300 struct GNUNET_DHT_GetHandle *gh;
301 /* store 'gh' with 'pr', cancel it on pr destruction, etc. */
302 gh = GNUNET_DHT_get_start (GSF_dht,
303 timeout,
304 type,
305 key,
306 des_repl_level,
307 options,
308 bf,
309 bf_mutator,
310 xquery,
311 xquery_size,
312 &GSF_handle_dht_reply_,
313 pr);
314#endif
315 GSF_iterate_connected_peers_ (&consider_request_for_forwarding,
316 pr);
317} 347}
318 348
319 349
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 45767f204..58af8be65 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -65,6 +65,16 @@ struct GSF_PendingRequest
65 struct GNUNET_CONTAINER_HeapNode *hnode; 65 struct GNUNET_CONTAINER_HeapNode *hnode;
66 66
67 /** 67 /**
68 * Datastore queue entry for this request (or NULL for none).
69 */
70 struct GNUNET_DATASTORE_QueueEntry *qe;
71
72 /**
73 * DHT request handle for this request (or NULL for none).
74 */
75 struct GNUNET_DHT_GetHandle *gh;
76
77 /**
68 * Identity of the peer that we should use for the 'sender' 78 * Identity of the peer that we should use for the 'sender'
69 * (recipient of the response) when forwarding (0 for none). 79 * (recipient of the response) when forwarding (0 for none).
70 */ 80 */
@@ -500,6 +510,10 @@ clean_request (void *cls,
500 if (NULL != pr->hnode) 510 if (NULL != pr->hnode)
501 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap, 511 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
502 pr->hnode); 512 pr->hnode);
513 if (NULL != pr->qe)
514 GNUNET_DATASTORE_cancel (pr->qe);
515 if (NULL != pr->gh)
516 GNUNET_DHT_get_stop (pr->gh);
503 GNUNET_free (pr); 517 GNUNET_free (pr);
504 return GNUNET_YES; 518 return GNUNET_YES;
505} 519}
@@ -713,6 +727,10 @@ process_reply (void *cls,
713 1, 727 1,
714 GNUNET_NO); 728 GNUNET_NO);
715 } 729 }
730 else
731 {
732 GSF_dht_lookup_ (pr);
733 }
716 prq->priority += pr->public_data.original_priority; 734 prq->priority += pr->public_data.original_priority;
717 pr->public_data.priority = 0; 735 pr->public_data.priority = 0;
718 pr->public_data.original_priority = 0; 736 pr->public_data.original_priority = 0;
@@ -799,15 +817,15 @@ test_put_load_too_high (uint32_t priority)
799 * @param size number of bytes in data 817 * @param size number of bytes in data
800 * @param data pointer to the result data 818 * @param data pointer to the result data
801 */ 819 */
802void 820static void
803GSF_handle_dht_reply_ (void *cls, 821handle_dht_reply (void *cls,
804 struct GNUNET_TIME_Absolute exp, 822 struct GNUNET_TIME_Absolute exp,
805 const GNUNET_HashCode *key, 823 const GNUNET_HashCode *key,
806 const struct GNUNET_PeerIdentity * const *get_path, 824 const struct GNUNET_PeerIdentity * const *get_path,
807 const struct GNUNET_PeerIdentity * const *put_path, 825 const struct GNUNET_PeerIdentity * const *put_path,
808 enum GNUNET_BLOCK_Type type, 826 enum GNUNET_BLOCK_Type type,
809 size_t size, 827 size_t size,
810 const void *data) 828 const void *data)
811{ 829{
812 struct GSF_PendingRequest *pr = cls; 830 struct GSF_PendingRequest *pr = cls;
813 struct ProcessReplyClosure prq; 831 struct ProcessReplyClosure prq;
@@ -843,6 +861,247 @@ GSF_handle_dht_reply_ (void *cls,
843 861
844 862
845/** 863/**
864 * Consider looking up the data in the DHT (anonymity-level permitting).
865 *
866 * @param pr the pending request to process
867 */
868void
869GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
870{
871 const void *xquery;
872 size_t xquery_size;
873 struct GNUNET_PeerIdentity pi;
874 char buf[sizeof (GNUNET_HashCode) * 2];
875
876 if (0 != pr->public_data.anonymity_level)
877 return;
878 if (NULL != pr->gh)
879 {
880 GNUNET_DHT_get_stop (pr->gh);
881 pr->gh = NULL;
882 }
883 xquery = NULL;
884 xquery_size = 0;
885 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
886 {
887 xquery = buf;
888 memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode));
889 xquery_size = sizeof (GNUNET_HashCode);
890 }
891 if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
892 {
893 GNUNET_PEER_resolve (pr->sender_pid,
894 &pi);
895 memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity));
896 xquery_size += sizeof (struct GNUNET_PeerIdentity);
897 }
898 pr->gh = GNUNET_DHT_get_start (GSF_dht,
899 GNUNET_TIME_UNIT_FOREVER_REL,
900 pr->public_data.type,
901 &pr->public_data.query,
902 DEFAULT_GET_REPLICATION,
903 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
904 pr->bf,
905 pr->mingle,
906 xquery,
907 xquery_size,
908 &handle_dht_reply,
909 pr);
910}
911
912
913/**
914 * We're processing (local) results for a search request
915 * from another peer. Pass applicable results to the
916 * peer and if we are done either clean up (operation
917 * complete) or forward to other peers (more results possible).
918 *
919 * @param cls our closure (struct LocalGetContext)
920 * @param key key for the content
921 * @param size number of bytes in data
922 * @param data content stored
923 * @param type type of the content
924 * @param priority priority of the content
925 * @param anonymity anonymity-level for the content
926 * @param expiration expiration time for the content
927 * @param uid unique identifier for the datum;
928 * maybe 0 if no unique identifier is available
929 */
930static void
931process_local_reply (void *cls,
932 const GNUNET_HashCode * key,
933 size_t size,
934 const void *data,
935 enum GNUNET_BLOCK_Type type,
936 uint32_t priority,
937 uint32_t anonymity,
938 struct GNUNET_TIME_Absolute expiration,
939 uint64_t uid)
940{
941#if FIXME
942 struct PendingRequest *pr = cls;
943 struct ProcessReplyClosure prq;
944 struct CheckDuplicateRequestClosure cdrc;
945 GNUNET_HashCode query;
946 unsigned int old_rf;
947
948 if (NULL == key)
949 {
950#if DEBUG_FS > 1
951 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952 "Done processing local replies, forwarding request to other peers.\n");
953#endif
954 pr->qe = NULL;
955 if (pr->client_request_list != NULL)
956 {
957 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
958 GNUNET_YES);
959 /* Figure out if this is a duplicate request and possibly
960 merge 'struct PendingRequest' entries */
961 cdrc.have = NULL;
962 cdrc.pr = pr;
963 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
964 &pr->query,
965 &check_duplicate_request_client,
966 &cdrc);
967 if (cdrc.have != NULL)
968 {
969#if DEBUG_FS
970 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
971 "Received request for block `%s' twice from client, will only request once.\n",
972 GNUNET_h2s (&pr->query));
973#endif
974
975 destroy_pending_request (pr);
976 return;
977 }
978 }
979 if (pr->local_only == GNUNET_YES)
980 {
981 destroy_pending_request (pr);
982 return;
983 }
984 /* no more results */
985 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
986 pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
987 pr);
988 return;
989 }
990#if DEBUG_FS
991 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
992 "New local response to `%s' of type %u.\n",
993 GNUNET_h2s (key),
994 type);
995#endif
996 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
997 {
998#if DEBUG_FS
999 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000 "Found ONDEMAND block, performing on-demand encoding\n");
1001#endif
1002 GNUNET_STATISTICS_update (stats,
1003 gettext_noop ("# on-demand blocks matched requests"),
1004 1,
1005 GNUNET_NO);
1006 if (GNUNET_OK !=
1007 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
1008 anonymity, expiration, uid,
1009 &process_local_reply,
1010 pr))
1011 if (pr->qe != NULL)
1012 {
1013 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1014 }
1015 return;
1016 }
1017 old_rf = pr->results_found;
1018 memset (&prq, 0, sizeof (prq));
1019 prq.data = data;
1020 prq.expiration = expiration;
1021 prq.size = size;
1022 if (GNUNET_OK !=
1023 GNUNET_BLOCK_get_key (block_ctx,
1024 type,
1025 data,
1026 size,
1027 &query))
1028 {
1029 GNUNET_break (0);
1030 GNUNET_DATASTORE_remove (dsh,
1031 key,
1032 size, data,
1033 -1, -1,
1034 GNUNET_TIME_UNIT_FOREVER_REL,
1035 NULL, NULL);
1036 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1037 return;
1038 }
1039 prq.type = type;
1040 prq.priority = priority;
1041 prq.finished = GNUNET_NO;
1042 prq.request_found = GNUNET_NO;
1043 prq.anonymity_level = anonymity;
1044 if ( (old_rf == 0) &&
1045 (pr->results_found == 0) )
1046 update_datastore_delays (pr->start_time);
1047 process_reply (&prq, key, pr);
1048 if (prq.finished == GNUNET_YES)
1049 return;
1050 if (pr->qe == NULL)
1051 return; /* done here */
1052 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
1053 {
1054 pr->local_only = GNUNET_YES; /* do not forward */
1055 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1056 return;
1057 }
1058 if ( (pr->client_request_list == NULL) &&
1059 ( (GNUNET_YES == test_get_load_too_high (0)) ||
1060 (pr->results_found > 5 + 2 * pr->priority) ) )
1061 {
1062#if DEBUG_FS > 2
1063 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064 "Load too high, done with request\n");
1065#endif
1066 GNUNET_STATISTICS_update (stats,
1067 gettext_noop ("# processing result set cut short due to load"),
1068 1,
1069 GNUNET_NO);
1070 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1071 return;
1072 }
1073 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1074#endif
1075}
1076
1077
1078/**
1079 * Look up the request in the local datastore.
1080 *
1081 * @param pr the pending request to process
1082 * @param cont function to call at the end
1083 * @param cont_cls closure for cont
1084 */
1085void
1086GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1087 GSF_LocalLookupContinuation cont,
1088 void *cont_cls)
1089{
1090 // FIXME: fix process_local_reply / cont!
1091 GNUNET_assert (NULL == pr->gh);
1092 pr->qe = GNUNET_DATASTORE_get (GSF_dsh,
1093 &pr->public_data.query,
1094 pr->public_data.type,
1095 1 /* queue priority */,
1096 1 /* max queue size */,
1097 GNUNET_TIME_UNIT_FOREVER_REL,
1098 &process_local_reply,
1099 pr);
1100}
1101
1102
1103
1104/**
846 * Handle P2P "CONTENT" message. Checks that the message is 1105 * Handle P2P "CONTENT" message. Checks that the message is
847 * well-formed and then checks if there are any pending requests for 1106 * well-formed and then checks if there are any pending requests for
848 * this content and possibly passes it on (to local clients or other 1107 * this content and possibly passes it on (to local clients or other
diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h
index b59cbc541..39a5fc77f 100644
--- a/src/fs/gnunet-service-fs_pr.h
+++ b/src/fs/gnunet-service-fs_pr.h
@@ -299,29 +299,38 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
299 299
300 300
301/** 301/**
302 * Iterator called on each result obtained for a DHT 302 * Consider looking up the data in the DHT (anonymity-level permitting).
303 * operation that expects a reply
304 * 303 *
305 * @param cls closure, the 'struct GSF_PendingRequest *'. 304 * @param pr the pending request to process
306 * @param exp when will this value expire
307 * @param key key of the result
308 * @param get_path NULL-terminated array of pointers
309 * to the peers on reverse GET path (or NULL if not recorded)
310 * @param put_path NULL-terminated array of pointers
311 * to the peers on the PUT path (or NULL if not recorded)
312 * @param type type of the result
313 * @param size number of bytes in data
314 * @param data pointer to the result data
315 */ 305 */
316void 306void
317GSF_handle_dht_reply_ (void *cls, 307GSF_dht_lookup_ (struct GSF_PendingRequest *pr);
318 struct GNUNET_TIME_Absolute exp, 308
319 const GNUNET_HashCode * key, 309
320 const struct GNUNET_PeerIdentity * const *get_path, 310/**
321 const struct GNUNET_PeerIdentity * const *put_path, 311 * Function to be called after we're done processing
322 enum GNUNET_BLOCK_Type type, 312 * replies from the local lookup.
323 size_t size, 313 *
324 const void *data); 314 * @param cls closure
315 * @param pr the pending request we were processing
316 * @param result final datastore lookup result
317 */
318typedef void (GSF_LocalLookupContinuation)(void *cls,
319 struct GSF_PendingRequest *pr,
320 enum GNUNET_BLOCK_EvaluationResult result);
321
322
323/**
324 * Look up the request in the local datastore.
325 *
326 * @param pr the pending request to process
327 * @param cont function to call at the end
328 * @param cont_cls closure for cont
329 */
330void
331GSF_local_lookup_ (struct GSF_PendingRequest *pr,
332 GSF_LocalLookupContinuation cont,
333 void *cont_cls);
325 334
326 335
327/** 336/**