aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-03-10 09:36:50 +0000
committerChristian Grothoff <christian@grothoff.org>2011-03-10 09:36:50 +0000
commitfaecd2a496b5d356509b0b6b0157db34e8b3188e (patch)
treee5d54c0cb196a05e4a54031f8498e5ffe82bc3a9 /src/fs/gnunet-service-fs_pr.c
parent073c4a9ae448041fdc9a0683fed49d55ae61803e (diff)
downloadgnunet-faecd2a496b5d356509b0b6b0157db34e8b3188e.tar.gz
gnunet-faecd2a496b5d356509b0b6b0157db34e8b3188e.zip
stuff
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c277
1 files changed, 268 insertions, 9 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 45767f204..58af8be65 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -65,6 +65,16 @@ struct GSF_PendingRequest
65 struct GNUNET_CONTAINER_HeapNode *hnode; 65 struct GNUNET_CONTAINER_HeapNode *hnode;
66 66
67 /** 67 /**
68 * Datastore queue entry for this request (or NULL for none).
69 */
70 struct GNUNET_DATASTORE_QueueEntry *qe;
71
72 /**
73 * DHT request handle for this request (or NULL for none).
74 */
75 struct GNUNET_DHT_GetHandle *gh;
76
77 /**
68 * Identity of the peer that we should use for the 'sender' 78 * Identity of the peer that we should use for the 'sender'
69 * (recipient of the response) when forwarding (0 for none). 79 * (recipient of the response) when forwarding (0 for none).
70 */ 80 */
@@ -500,6 +510,10 @@ clean_request (void *cls,
500 if (NULL != pr->hnode) 510 if (NULL != pr->hnode)
501 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap, 511 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
502 pr->hnode); 512 pr->hnode);
513 if (NULL != pr->qe)
514 GNUNET_DATASTORE_cancel (pr->qe);
515 if (NULL != pr->gh)
516 GNUNET_DHT_get_stop (pr->gh);
503 GNUNET_free (pr); 517 GNUNET_free (pr);
504 return GNUNET_YES; 518 return GNUNET_YES;
505} 519}
@@ -713,6 +727,10 @@ process_reply (void *cls,
713 1, 727 1,
714 GNUNET_NO); 728 GNUNET_NO);
715 } 729 }
730 else
731 {
732 GSF_dht_lookup_ (pr);
733 }
716 prq->priority += pr->public_data.original_priority; 734 prq->priority += pr->public_data.original_priority;
717 pr->public_data.priority = 0; 735 pr->public_data.priority = 0;
718 pr->public_data.original_priority = 0; 736 pr->public_data.original_priority = 0;
@@ -799,15 +817,15 @@ test_put_load_too_high (uint32_t priority)
799 * @param size number of bytes in data 817 * @param size number of bytes in data
800 * @param data pointer to the result data 818 * @param data pointer to the result data
801 */ 819 */
802void 820static void
803GSF_handle_dht_reply_ (void *cls, 821handle_dht_reply (void *cls,
804 struct GNUNET_TIME_Absolute exp, 822 struct GNUNET_TIME_Absolute exp,
805 const GNUNET_HashCode *key, 823 const GNUNET_HashCode *key,
806 const struct GNUNET_PeerIdentity * const *get_path, 824 const struct GNUNET_PeerIdentity * const *get_path,
807 const struct GNUNET_PeerIdentity * const *put_path, 825 const struct GNUNET_PeerIdentity * const *put_path,
808 enum GNUNET_BLOCK_Type type, 826 enum GNUNET_BLOCK_Type type,
809 size_t size, 827 size_t size,
810 const void *data) 828 const void *data)
811{ 829{
812 struct GSF_PendingRequest *pr = cls; 830 struct GSF_PendingRequest *pr = cls;
813 struct ProcessReplyClosure prq; 831 struct ProcessReplyClosure prq;
@@ -843,6 +861,247 @@ GSF_handle_dht_reply_ (void *cls,
843 861
844 862
845/** 863/**
864 * Consider looking up the data in the DHT (anonymity-level permitting).
865 *
866 * @param pr the pending request to process
867 */
868void
869GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
870{
871 const void *xquery;
872 size_t xquery_size;
873 struct GNUNET_PeerIdentity pi;
874 char buf[sizeof (GNUNET_HashCode) * 2];
875
876 if (0 != pr->public_data.anonymity_level)
877 return;
878 if (NULL != pr->gh)
879 {
880 GNUNET_DHT_get_stop (pr->gh);
881 pr->gh = NULL;
882 }
883 xquery = NULL;
884 xquery_size = 0;
885 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
886 {
887 xquery = buf;
888 memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode));
889 xquery_size = sizeof (GNUNET_HashCode);
890 }
891 if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
892 {
893 GNUNET_PEER_resolve (pr->sender_pid,
894 &pi);
895 memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity));
896 xquery_size += sizeof (struct GNUNET_PeerIdentity);
897 }
898 pr->gh = GNUNET_DHT_get_start (GSF_dht,
899 GNUNET_TIME_UNIT_FOREVER_REL,
900 pr->public_data.type,
901 &pr->public_data.query,
902 DEFAULT_GET_REPLICATION,
903 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
904 pr->bf,
905 pr->mingle,
906 xquery,
907 xquery_size,
908 &handle_dht_reply,
909 pr);
910}
911
912
913/**
914 * We're processing (local) results for a search request
915 * from another peer. Pass applicable results to the
916 * peer and if we are done either clean up (operation
917 * complete) or forward to other peers (more results possible).
918 *
919 * @param cls our closure (struct LocalGetContext)
920 * @param key key for the content
921 * @param size number of bytes in data
922 * @param data content stored
923 * @param type type of the content
924 * @param priority priority of the content
925 * @param anonymity anonymity-level for the content
926 * @param expiration expiration time for the content
927 * @param uid unique identifier for the datum;
928 * maybe 0 if no unique identifier is available
929 */
930static void
931process_local_reply (void *cls,
932 const GNUNET_HashCode * key,
933 size_t size,
934 const void *data,
935 enum GNUNET_BLOCK_Type type,
936 uint32_t priority,
937 uint32_t anonymity,
938 struct GNUNET_TIME_Absolute expiration,
939 uint64_t uid)
940{
941#if FIXME
942 struct PendingRequest *pr = cls;
943 struct ProcessReplyClosure prq;
944 struct CheckDuplicateRequestClosure cdrc;
945 GNUNET_HashCode query;
946 unsigned int old_rf;
947
948 if (NULL == key)
949 {
950#if DEBUG_FS > 1
951 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952 "Done processing local replies, forwarding request to other peers.\n");
953#endif
954 pr->qe = NULL;
955 if (pr->client_request_list != NULL)
956 {
957 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
958 GNUNET_YES);
959 /* Figure out if this is a duplicate request and possibly
960 merge 'struct PendingRequest' entries */
961 cdrc.have = NULL;
962 cdrc.pr = pr;
963 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
964 &pr->query,
965 &check_duplicate_request_client,
966 &cdrc);
967 if (cdrc.have != NULL)
968 {
969#if DEBUG_FS
970 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
971 "Received request for block `%s' twice from client, will only request once.\n",
972 GNUNET_h2s (&pr->query));
973#endif
974
975 destroy_pending_request (pr);
976 return;
977 }
978 }
979 if (pr->local_only == GNUNET_YES)
980 {
981 destroy_pending_request (pr);
982 return;
983 }
984 /* no more results */
985 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
986 pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
987 pr);
988 return;
989 }
990#if DEBUG_FS
991 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
992 "New local response to `%s' of type %u.\n",
993 GNUNET_h2s (key),
994 type);
995#endif
996 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
997 {
998#if DEBUG_FS
999 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000 "Found ONDEMAND block, performing on-demand encoding\n");
1001#endif
1002 GNUNET_STATISTICS_update (stats,
1003 gettext_noop ("# on-demand blocks matched requests"),
1004 1,
1005 GNUNET_NO);
1006 if (GNUNET_OK !=
1007 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
1008 anonymity, expiration, uid,
1009 &process_local_reply,
1010 pr))
1011 if (pr->qe != NULL)
1012 {
1013 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1014 }
1015 return;
1016 }
1017 old_rf = pr->results_found;
1018 memset (&prq, 0, sizeof (prq));
1019 prq.data = data;
1020 prq.expiration = expiration;
1021 prq.size = size;
1022 if (GNUNET_OK !=
1023 GNUNET_BLOCK_get_key (block_ctx,
1024 type,
1025 data,
1026 size,
1027 &query))
1028 {
1029 GNUNET_break (0);
1030 GNUNET_DATASTORE_remove (dsh,
1031 key,
1032 size, data,
1033 -1, -1,
1034 GNUNET_TIME_UNIT_FOREVER_REL,
1035 NULL, NULL);
1036 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1037 return;
1038 }
1039 prq.type = type;
1040 prq.priority = priority;
1041 prq.finished = GNUNET_NO;
1042 prq.request_found = GNUNET_NO;
1043 prq.anonymity_level = anonymity;
1044 if ( (old_rf == 0) &&
1045 (pr->results_found == 0) )
1046 update_datastore_delays (pr->start_time);
1047 process_reply (&prq, key, pr);
1048 if (prq.finished == GNUNET_YES)
1049 return;
1050 if (pr->qe == NULL)
1051 return; /* done here */
1052 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
1053 {
1054 pr->local_only = GNUNET_YES; /* do not forward */
1055 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1056 return;
1057 }
1058 if ( (pr->client_request_list == NULL) &&
1059 ( (GNUNET_YES == test_get_load_too_high (0)) ||
1060 (pr->results_found > 5 + 2 * pr->priority) ) )
1061 {
1062#if DEBUG_FS > 2
1063 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064 "Load too high, done with request\n");
1065#endif
1066 GNUNET_STATISTICS_update (stats,
1067 gettext_noop ("# processing result set cut short due to load"),
1068 1,
1069 GNUNET_NO);
1070 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1071 return;
1072 }
1073 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1074#endif
1075}
1076
1077
1078/**
1079 * Look up the request in the local datastore.
1080 *
1081 * @param pr the pending request to process
1082 * @param cont function to call at the end
1083 * @param cont_cls closure for cont
1084 */
1085void
1086GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1087 GSF_LocalLookupContinuation cont,
1088 void *cont_cls)
1089{
1090 // FIXME: fix process_local_reply / cont!
1091 GNUNET_assert (NULL == pr->gh);
1092 pr->qe = GNUNET_DATASTORE_get (GSF_dsh,
1093 &pr->public_data.query,
1094 pr->public_data.type,
1095 1 /* queue priority */,
1096 1 /* max queue size */,
1097 GNUNET_TIME_UNIT_FOREVER_REL,
1098 &process_local_reply,
1099 pr);
1100}
1101
1102
1103
1104/**
846 * Handle P2P "CONTENT" message. Checks that the message is 1105 * Handle P2P "CONTENT" message. Checks that the message is
847 * well-formed and then checks if there are any pending requests for 1106 * well-formed and then checks if there are any pending requests for
848 * this content and possibly passes it on (to local clients or other 1107 * this content and possibly passes it on (to local clients or other