aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-03-10 17:34:42 +0000
committerChristian Grothoff <christian@grothoff.org>2011-03-10 17:34:42 +0000
commite87e273ce5f864e20fcca02c34bef72de4fc00bd (patch)
tree14125cc08e325f9133d4a7a8b43b7de5f7fece94 /src/fs/gnunet-service-fs_pr.c
parente253ed6e7e5ac2a113b6dbc762258f358c9ca5ae (diff)
downloadgnunet-e87e273ce5f864e20fcca02c34bef72de4fc00bd.tar.gz
gnunet-e87e273ce5f864e20fcca02c34bef72de4fc00bd.zip
load
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c106
1 files changed, 47 insertions, 59 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 58af8be65..0fdcd0cf1 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -26,6 +26,7 @@
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_load_lib.h" 27#include "gnunet_load_lib.h"
28#include "gnunet-service-fs_cp.h" 28#include "gnunet-service-fs_cp.h"
29#include "gnunet-service-fs_indexing.h"
29#include "gnunet-service-fs_pr.h" 30#include "gnunet-service-fs_pr.h"
30 31
31 32
@@ -75,6 +76,22 @@ struct GSF_PendingRequest
75 struct GNUNET_DHT_GetHandle *gh; 76 struct GNUNET_DHT_GetHandle *gh;
76 77
77 /** 78 /**
79 * Function to call upon completion of the local get
80 * request, or NULL for none.
81 */
82 GSF_LocalLookupContinuation llc_cont;
83
84 /**
85 * Closure for llc_cont.
86 */
87 void *llc_cont_cls;
88
89 /**
90 * Last result from the local datastore lookup evaluation.
91 */
92 enum GNUNET_BLOCK_EvaluationResult local_result;
93
94 /**
78 * Identity of the peer that we should use for the 'sender' 95 * Identity of the peer that we should use for the 'sender'
79 * (recipient of the response) when forwarding (0 for none). 96 * (recipient of the response) when forwarding (0 for none).
80 */ 97 */
@@ -683,6 +700,7 @@ process_reply (void *cls,
683 prq->data, prq->size, 700 prq->data, prq->size,
684 GNUNET_NO); 701 GNUNET_NO);
685 /* destroy request, we're done */ 702 /* destroy request, we're done */
703 prq->finished = GNUNET_YES;
686 GSF_pending_request_cancel_ (pr); 704 GSF_pending_request_cancel_ (pr);
687 return GNUNET_YES; 705 return GNUNET_YES;
688 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: 706 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
@@ -909,14 +927,13 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
909 pr); 927 pr);
910} 928}
911 929
912
913/** 930/**
914 * We're processing (local) results for a search request 931 * We're processing (local) results for a search request
915 * from another peer. Pass applicable results to the 932 * from another peer. Pass applicable results to the
916 * peer and if we are done either clean up (operation 933 * peer and if we are done either clean up (operation
917 * complete) or forward to other peers (more results possible). 934 * complete) or forward to other peers (more results possible).
918 * 935 *
919 * @param cls our closure (struct LocalGetContext) 936 * @param cls our closure (struct PendingRequest)
920 * @param key key for the content 937 * @param key key for the content
921 * @param size number of bytes in data 938 * @param size number of bytes in data
922 * @param data content stored 939 * @param data content stored
@@ -938,53 +955,23 @@ process_local_reply (void *cls,
938 struct GNUNET_TIME_Absolute expiration, 955 struct GNUNET_TIME_Absolute expiration,
939 uint64_t uid) 956 uint64_t uid)
940{ 957{
941#if FIXME 958 struct GSF_PendingRequest *pr = cls;
942 struct PendingRequest *pr = cls; 959 GSF_LocalLookupContinuation cont;
960
943 struct ProcessReplyClosure prq; 961 struct ProcessReplyClosure prq;
944 struct CheckDuplicateRequestClosure cdrc;
945 GNUNET_HashCode query; 962 GNUNET_HashCode query;
946 unsigned int old_rf; 963 unsigned int old_rf;
947 964
948 if (NULL == key) 965 if (NULL == key)
949 { 966 {
950#if DEBUG_FS > 1
951 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952 "Done processing local replies, forwarding request to other peers.\n");
953#endif
954 pr->qe = NULL; 967 pr->qe = NULL;
955 if (pr->client_request_list != NULL) 968 if (NULL != (cont = pr->llc_cont))
956 {
957 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
958 GNUNET_YES);
959 /* Figure out if this is a duplicate request and possibly
960 merge 'struct PendingRequest' entries */
961 cdrc.have = NULL;
962 cdrc.pr = pr;
963 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
964 &pr->query,
965 &check_duplicate_request_client,
966 &cdrc);
967 if (cdrc.have != NULL)
968 {
969#if DEBUG_FS
970 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
971 "Received request for block `%s' twice from client, will only request once.\n",
972 GNUNET_h2s (&pr->query));
973#endif
974
975 destroy_pending_request (pr);
976 return;
977 }
978 }
979 if (pr->local_only == GNUNET_YES)
980 { 969 {
981 destroy_pending_request (pr); 970 pr->llc_cont = NULL;
982 return; 971 cont (pr->llc_cont_cls,
972 pr,
973 pr->local_result);
983 } 974 }
984 /* no more results */
985 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
986 pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
987 pr);
988 return; 975 return;
989 } 976 }
990#if DEBUG_FS 977#if DEBUG_FS
@@ -999,7 +986,7 @@ process_local_reply (void *cls,
999 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 986 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000 "Found ONDEMAND block, performing on-demand encoding\n"); 987 "Found ONDEMAND block, performing on-demand encoding\n");
1001#endif 988#endif
1002 GNUNET_STATISTICS_update (stats, 989 GNUNET_STATISTICS_update (GSF_stats,
1003 gettext_noop ("# on-demand blocks matched requests"), 990 gettext_noop ("# on-demand blocks matched requests"),
1004 1, 991 1,
1005 GNUNET_NO); 992 GNUNET_NO);
@@ -1008,32 +995,32 @@ process_local_reply (void *cls,
1008 anonymity, expiration, uid, 995 anonymity, expiration, uid,
1009 &process_local_reply, 996 &process_local_reply,
1010 pr)) 997 pr))
1011 if (pr->qe != NULL)
1012 { 998 {
1013 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 999 if (pr->qe != NULL)
1000 GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
1014 } 1001 }
1015 return; 1002 return;
1016 } 1003 }
1017 old_rf = pr->results_found; 1004 old_rf = pr->public_data.results_found;
1018 memset (&prq, 0, sizeof (prq)); 1005 memset (&prq, 0, sizeof (prq));
1019 prq.data = data; 1006 prq.data = data;
1020 prq.expiration = expiration; 1007 prq.expiration = expiration;
1021 prq.size = size; 1008 prq.size = size;
1022 if (GNUNET_OK != 1009 if (GNUNET_OK !=
1023 GNUNET_BLOCK_get_key (block_ctx, 1010 GNUNET_BLOCK_get_key (GSF_block_ctx,
1024 type, 1011 type,
1025 data, 1012 data,
1026 size, 1013 size,
1027 &query)) 1014 &query))
1028 { 1015 {
1029 GNUNET_break (0); 1016 GNUNET_break (0);
1030 GNUNET_DATASTORE_remove (dsh, 1017 GNUNET_DATASTORE_remove (GSF_dsh,
1031 key, 1018 key,
1032 size, data, 1019 size, data,
1033 -1, -1, 1020 -1, -1,
1034 GNUNET_TIME_UNIT_FOREVER_REL, 1021 GNUNET_TIME_UNIT_FOREVER_REL,
1035 NULL, NULL); 1022 NULL, NULL);
1036 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 1023 GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
1037 return; 1024 return;
1038 } 1025 }
1039 prq.type = type; 1026 prq.type = type;
@@ -1042,36 +1029,35 @@ process_local_reply (void *cls,
1042 prq.request_found = GNUNET_NO; 1029 prq.request_found = GNUNET_NO;
1043 prq.anonymity_level = anonymity; 1030 prq.anonymity_level = anonymity;
1044 if ( (old_rf == 0) && 1031 if ( (old_rf == 0) &&
1045 (pr->results_found == 0) ) 1032 (pr->public_data.results_found == 0) )
1046 update_datastore_delays (pr->start_time); 1033 GSF_update_datastore_delay_ (pr->public_data.start_time);
1047 process_reply (&prq, key, pr); 1034 process_reply (&prq, key, pr);
1048 if (prq.finished == GNUNET_YES) 1035 if (prq.finished == GNUNET_YES)
1049 return; 1036 return;
1037 pr->local_result = prq.eval;
1050 if (pr->qe == NULL) 1038 if (pr->qe == NULL)
1051 return; /* done here */ 1039 return; /* done here */
1052 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) 1040 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
1053 { 1041 {
1054 pr->local_only = GNUNET_YES; /* do not forward */ 1042 GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_NO);
1055 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1056 return; 1043 return;
1057 } 1044 }
1058 if ( (pr->client_request_list == NULL) && 1045 if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1059 ( (GNUNET_YES == test_get_load_too_high (0)) || 1046 ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
1060 (pr->results_found > 5 + 2 * pr->priority) ) ) 1047 (pr->public_data.results_found > 5 + 2 * pr->public_data.priority) ) )
1061 { 1048 {
1062#if DEBUG_FS > 2 1049#if DEBUG_FS > 2
1063 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1050 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064 "Load too high, done with request\n"); 1051 "Load too high, done with request\n");
1065#endif 1052#endif
1066 GNUNET_STATISTICS_update (stats, 1053 GNUNET_STATISTICS_update (GSF_stats,
1067 gettext_noop ("# processing result set cut short due to load"), 1054 gettext_noop ("# processing result set cut short due to load"),
1068 1, 1055 1,
1069 GNUNET_NO); 1056 GNUNET_NO);
1070 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); 1057 GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_NO);
1071 return; 1058 return;
1072 } 1059 }
1073 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 1060 GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES);
1074#endif
1075} 1061}
1076 1062
1077 1063
@@ -1087,8 +1073,10 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1087 GSF_LocalLookupContinuation cont, 1073 GSF_LocalLookupContinuation cont,
1088 void *cont_cls) 1074 void *cont_cls)
1089{ 1075{
1090 // FIXME: fix process_local_reply / cont!
1091 GNUNET_assert (NULL == pr->gh); 1076 GNUNET_assert (NULL == pr->gh);
1077 GNUNET_assert (NULL == pr->llc_cont);
1078 pr->llc_cont = cont;
1079 pr->llc_cont_cls = cont_cls;
1092 pr->qe = GNUNET_DATASTORE_get (GSF_dsh, 1080 pr->qe = GNUNET_DATASTORE_get (GSF_dsh,
1093 &pr->public_data.query, 1081 &pr->public_data.query,
1094 pr->public_data.type, 1082 pr->public_data.type,