diff options
author | Christian Grothoff <christian@grothoff.org> | 2010-01-29 19:21:41 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2010-01-29 19:21:41 +0000 |
commit | 258bd33b0a8e26200d8bf36d8e65524a1069790d (patch) | |
tree | b45169bffadac84468c33d1fd696ff81ee84a3b3 /src/fs | |
parent | 65f29cbee10775c4ec627f01c115d1bebea88642 (diff) | |
download | gnunet-258bd33b0a8e26200d8bf36d8e65524a1069790d.tar.gz gnunet-258bd33b0a8e26200d8bf36d8e65524a1069790d.zip |
finally compiles again
Diffstat (limited to 'src/fs')
-rw-r--r-- | src/fs/Makefile.am | 3 | ||||
-rw-r--r-- | src/fs/fs.h | 49 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs.c | 3322 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_drq.c | 416 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_drq.h | 137 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_indexing.c | 37 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_indexing.h | 12 |
7 files changed, 2038 insertions, 1938 deletions
diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index 70897c5ef..a43c8340d 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am | |||
@@ -85,7 +85,8 @@ gnunet_search_LDADD = \ | |||
85 | 85 | ||
86 | gnunet_service_fs_SOURCES = \ | 86 | gnunet_service_fs_SOURCES = \ |
87 | gnunet-service-fs.c \ | 87 | gnunet-service-fs.c \ |
88 | gnunet-service-fs_indexing.c gnunet-service-fs_indexing.h | 88 | gnunet-service-fs_drq.c gnunet-service-fs_drq.h \ |
89 | gnunet-service-fs_indexing.c gnunet-service-fs_indexing.h | ||
89 | gnunet_service_fs_LDADD = \ | 90 | gnunet_service_fs_LDADD = \ |
90 | $(top_builddir)/src/fs/libgnunetfs.la \ | 91 | $(top_builddir)/src/fs/libgnunetfs.la \ |
91 | $(top_builddir)/src/datastore/libgnunetdatastore.la \ | 92 | $(top_builddir)/src/datastore/libgnunetdatastore.la \ |
diff --git a/src/fs/fs.h b/src/fs/fs.h index ae2e67654..d19f27b6f 100644 --- a/src/fs/fs.h +++ b/src/fs/fs.h | |||
@@ -1132,28 +1132,23 @@ struct SBlock | |||
1132 | 1132 | ||
1133 | 1133 | ||
1134 | /** | 1134 | /** |
1135 | * Message sent from a GNUnet (fs) publishing | 1135 | * Message sent from a GNUnet (fs) publishing activity to the |
1136 | * activity to the gnunet-fs-service to | 1136 | * gnunet-fs-service to initiate indexing of a file. The service is |
1137 | * initiate indexing of a file. The service | 1137 | * supposed to check if the specified file is available and has the |
1138 | * is supposed to check if the specified file | 1138 | * same cryptographic hash. It should then respond with either a |
1139 | * is available and has the same cryptographic | 1139 | * confirmation or a denial. |
1140 | * hash. It should then respond with either | ||
1141 | * a confirmation or a denial. | ||
1142 | * | 1140 | * |
1143 | * On OSes where this works, it is considered | 1141 | * On OSes where this works, it is considered acceptable if the |
1144 | * acceptable if the service only checks that | 1142 | * service only checks that the path, device and inode match (it can |
1145 | * the path, device and inode match (it can | 1143 | * then be assumed that the hash will also match without actually |
1146 | * then be assumed that the hash will also match | 1144 | * computing it; this is an optimization that should be safe given |
1147 | * without actually computing it; this is an | 1145 | * that the client is not our adversary). |
1148 | * optimization that should be safe given that | ||
1149 | * the client is not our adversary). | ||
1150 | */ | 1146 | */ |
1151 | struct IndexStartMessage | 1147 | struct IndexStartMessage |
1152 | { | 1148 | { |
1153 | 1149 | ||
1154 | /** | 1150 | /** |
1155 | * Message type will be | 1151 | * Message type will be GNUNET_MESSAGE_TYPE_FS_INDEX_START. |
1156 | * GNUNET_MESSAGE_TYPE_FS_INDEX_START. | ||
1157 | */ | 1152 | */ |
1158 | struct GNUNET_MessageHeader header; | 1153 | struct GNUNET_MessageHeader header; |
1159 | 1154 | ||
@@ -1216,12 +1211,10 @@ struct IndexInfoMessage | |||
1216 | 1211 | ||
1217 | 1212 | ||
1218 | /** | 1213 | /** |
1219 | * Message sent from a GNUnet (fs) unindexing | 1214 | * Message sent from a GNUnet (fs) unindexing activity to the |
1220 | * activity to the gnunet-fs-service to | 1215 | * gnunet-service-fs to indicate that a file will be unindexed. The |
1221 | * indicate that a file will be unindexed. The service | 1216 | * service is supposed to remove the file from the list of indexed |
1222 | * is supposed to remove the file from the | 1217 | * files and response with a confirmation message (even if the file |
1223 | * list of indexed files and response with | ||
1224 | * a confirmation message (even if the file | ||
1225 | * was already not on the list). | 1218 | * was already not on the list). |
1226 | */ | 1219 | */ |
1227 | struct UnindexMessage | 1220 | struct UnindexMessage |
@@ -1247,9 +1240,8 @@ struct UnindexMessage | |||
1247 | 1240 | ||
1248 | 1241 | ||
1249 | /** | 1242 | /** |
1250 | * Message sent from a GNUnet (fs) search | 1243 | * Message sent from a GNUnet (fs) search activity to the |
1251 | * activity to the gnunet-fs-service to | 1244 | * gnunet-service-fs to start a search. |
1252 | * start a search. | ||
1253 | */ | 1245 | */ |
1254 | struct SearchMessage | 1246 | struct SearchMessage |
1255 | { | 1247 | { |
@@ -1308,10 +1300,9 @@ struct SearchMessage | |||
1308 | 1300 | ||
1309 | 1301 | ||
1310 | /** | 1302 | /** |
1311 | * Response from FS service with a result for | 1303 | * Response from FS service with a result for a previous FS search. |
1312 | * a previous FS search. Note that queries | 1304 | * Note that queries for DBLOCKS and IBLOCKS that have received a |
1313 | * for DBLOCKS and IBLOCKS that have received | 1305 | * single response are considered done. |
1314 | * a single response are considered done. | ||
1315 | */ | 1306 | */ |
1316 | struct ContentMessage | 1307 | struct ContentMessage |
1317 | { | 1308 | { |
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 57b6dd421..740f63624 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | (C) 2009 Christian Grothoff (and other contributing authors) | 3 | (C) 2009, 2010 Christian Grothoff (and other contributing authors) |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -20,341 +20,363 @@ | |||
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file fs/gnunet-service-fs.c | 22 | * @file fs/gnunet-service-fs.c |
23 | * @brief program that provides the file-sharing service | 23 | * @brief gnunet anonymity protocol implementation |
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | * | 25 | * |
26 | * TODO: | 26 | * TODO: |
27 | * - fix gazillion of minor FIXME's | 27 | * - forward_request_task (P2P forwarding!) |
28 | * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the | 28 | * - track stats for hot-path routing |
29 | * core to transmit to another peer; need to make sure this is bounded overall... | 29 | * - implement hot-path routing decision procedure |
30 | * - randomly delay processing for improved anonymity (can wait) | 30 | * - detect duplicate requests (P2P and CS) |
31 | * - content migration (put in local DS) (can wait) | 31 | * - implement: bound_priority, test_load_too_high, validate_skblock |
32 | * - handle some special cases when forwarding replies based on tracked requests (can wait) | 32 | * - add content migration support (store locally) |
33 | * - tracking of success correlations for hot-path routing (can wait) | 33 | * - add random delay |
34 | * - various load-based actions (can wait) | 34 | * - statistics |
35 | * - validation of KSBLOCKS (can wait) | 35 | * |
36 | * - remove on-demand blocks if they keep failing (can wait) | ||
37 | * - check that we decrement PIDs always where necessary (can wait) | ||
38 | * - find out how to use core-pulling instead of pushing (at least for some cases) | ||
39 | */ | 36 | */ |
40 | #include "platform.h" | 37 | #include "platform.h" |
41 | #include <float.h> | 38 | #include <float.h> |
39 | #include "gnunet_constants.h" | ||
42 | #include "gnunet_core_service.h" | 40 | #include "gnunet_core_service.h" |
43 | #include "gnunet_datastore_service.h" | 41 | #include "gnunet_datastore_service.h" |
44 | #include "gnunet_peer_lib.h" | 42 | #include "gnunet_peer_lib.h" |
45 | #include "gnunet_protocols.h" | 43 | #include "gnunet_protocols.h" |
46 | #include "gnunet_signatures.h" | 44 | #include "gnunet_signatures.h" |
47 | #include "gnunet_util_lib.h" | 45 | #include "gnunet_util_lib.h" |
46 | #include "gnunet-service-fs_drq.h" | ||
48 | #include "gnunet-service-fs_indexing.h" | 47 | #include "gnunet-service-fs_indexing.h" |
49 | #include "fs.h" | 48 | #include "fs.h" |
50 | 49 | ||
51 | #define DEBUG_FS GNUNET_NO | 50 | /** |
51 | * Maximum number of outgoing messages we queue per peer. | ||
52 | * FIXME: set to a tiny value for testing; make configurable. | ||
53 | */ | ||
54 | #define MAX_QUEUE_PER_PEER 2 | ||
55 | |||
56 | |||
57 | |||
58 | /** | ||
59 | * Maximum number of requests (from other peers) that we're | ||
60 | * willing to have pending at any given point in time. | ||
61 | * FIXME: set from configuration (and 32 is a tiny value for testing only). | ||
62 | */ | ||
63 | static uint64_t max_pending_requests = 32; | ||
64 | |||
65 | |||
66 | /** | ||
67 | * Information we keep for each pending reply. The | ||
68 | * actual message follows at the end of this struct. | ||
69 | */ | ||
70 | struct PendingMessage; | ||
71 | |||
52 | 72 | ||
53 | /** | 73 | /** |
54 | * Signature of a function that is called whenever a datastore | 74 | * Function called upon completion of a transmission. |
55 | * request can be processed (or an entry put on the queue times out). | ||
56 | * | 75 | * |
57 | * @param cls closure | 76 | * @param cls closure |
58 | * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout | 77 | * @param pid ID of receiving peer, 0 on transmission error |
59 | */ | 78 | */ |
60 | typedef void (*RequestFunction)(void *cls, | 79 | typedef void (*TransmissionContinuation)(void * cls, |
61 | int ok); | 80 | GNUNET_PEER_Id tpid); |
62 | 81 | ||
63 | 82 | ||
64 | /** | 83 | /** |
65 | * Doubly-linked list of our requests for the datastore. | 84 | * Information we keep for each pending reply. The |
85 | * actual message follows at the end of this struct. | ||
66 | */ | 86 | */ |
67 | struct DatastoreRequestQueue | 87 | struct PendingMessage |
68 | { | 88 | { |
69 | |||
70 | /** | 89 | /** |
71 | * This is a doubly-linked list. | 90 | * This is a doubly-linked list of messages to the same peer. |
72 | */ | 91 | */ |
73 | struct DatastoreRequestQueue *next; | 92 | struct PendingMessage *next; |
74 | 93 | ||
75 | /** | 94 | /** |
76 | * This is a doubly-linked list. | 95 | * This is a doubly-linked list of messages to the same peer. |
77 | */ | 96 | */ |
78 | struct DatastoreRequestQueue *prev; | 97 | struct PendingMessage *prev; |
79 | 98 | ||
80 | /** | 99 | /** |
81 | * Function to call (will issue the request). | 100 | * Entry in pending message list for this pending message. |
101 | */ | ||
102 | struct PendingMessageList *pml; | ||
103 | |||
104 | /** | ||
105 | * Function to call immediately once we have transmitted this | ||
106 | * message. | ||
82 | */ | 107 | */ |
83 | RequestFunction req; | 108 | TransmissionContinuation cont; |
84 | 109 | ||
85 | /** | 110 | /** |
86 | * Closure for req. | 111 | * Closure for cont. |
87 | */ | 112 | */ |
88 | void *req_cls; | 113 | void *cont_cls; |
89 | 114 | ||
90 | /** | 115 | /** |
91 | * When should this request time-out because we don't care anymore? | 116 | * Size of the reply; actual reply message follows |
117 | * at the end of this struct. | ||
92 | */ | 118 | */ |
93 | struct GNUNET_TIME_Absolute timeout; | 119 | size_t msize; |
94 | 120 | ||
95 | /** | 121 | /** |
96 | * ID of task used for signaling timeout. | 122 | * How important is this message for us? |
97 | */ | 123 | */ |
98 | GNUNET_SCHEDULER_TaskIdentifier task; | 124 | uint32_t priority; |
99 | 125 | ||
100 | }; | 126 | }; |
101 | 127 | ||
102 | 128 | ||
103 | /** | 129 | /** |
104 | * Closure for processing START_SEARCH messages from a client. | 130 | * Information about a peer that we are connected to. |
131 | * We track data that is useful for determining which | ||
132 | * peers should receive our requests. We also keep | ||
133 | * a list of messages to transmit to this peer. | ||
105 | */ | 134 | */ |
106 | struct LocalGetContext | 135 | struct ConnectedPeer |
107 | { | 136 | { |
108 | 137 | ||
109 | /** | 138 | /** |
110 | * This is a doubly-linked list. | 139 | * List of the last clients for which this peer successfully |
140 | * answered a query. | ||
111 | */ | 141 | */ |
112 | struct LocalGetContext *next; | 142 | struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE]; |
113 | 143 | ||
114 | /** | 144 | /** |
115 | * This is a doubly-linked list. | 145 | * List of the last PIDs for which |
146 | * this peer successfully answered a query; | ||
147 | * We use 0 to indicate no successful reply. | ||
116 | */ | 148 | */ |
117 | struct LocalGetContext *prev; | 149 | GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE]; |
118 | 150 | ||
119 | /** | 151 | /** |
120 | * Client that initiated the search. | 152 | * Average delay between sending the peer a request and |
121 | */ | 153 | * getting a reply (only calculated over the requests for |
122 | struct GNUNET_SERVER_Client *client; | 154 | * which we actually got a reply). Calculated |
155 | * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n | ||
156 | */ | ||
157 | struct GNUNET_TIME_Relative avg_delay; | ||
123 | 158 | ||
124 | /** | 159 | /** |
125 | * Array of results that we've already received | 160 | * Handle for an active request for transmission to this |
126 | * (can be NULL). | 161 | * peer, or NULL. |
127 | */ | 162 | */ |
128 | GNUNET_HashCode *results; | 163 | struct GNUNET_CORE_TransmitHandle *cth; |
129 | 164 | ||
130 | /** | 165 | /** |
131 | * Bloomfilter over all results (for fast query construction); | 166 | * Messages (replies, queries, content migration) we would like to |
132 | * NULL if we don't have any results. | 167 | * send to this peer in the near future. Sorted by priority, head. |
133 | * | ||
134 | * FIXME: this member is not used, is that OK? If so, it should | ||
135 | * be removed! | ||
136 | */ | 168 | */ |
137 | struct GNUNET_CONTAINER_BloomFilter *results_bf; | 169 | struct PendingMessage *pending_messages_head; |
138 | 170 | ||
139 | /** | 171 | /** |
140 | * DS request associated with this operation. | 172 | * Messages (replies, queries, content migration) we would like to |
173 | * send to this peer in the near future. Sorted by priority, tail. | ||
141 | */ | 174 | */ |
142 | struct DatastoreRequestQueue *req; | 175 | struct PendingMessage *pending_messages_tail; |
143 | 176 | ||
144 | /** | 177 | /** |
145 | * Current result message to transmit to client (or NULL). | 178 | * Average priority of successful replies. Calculated |
146 | */ | 179 | * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n |
147 | struct ContentMessage *result; | ||
148 | |||
149 | /** | ||
150 | * Type of the content that we're looking for. | ||
151 | * 0 for any. | ||
152 | */ | 180 | */ |
153 | uint32_t type; | 181 | double avg_priority; |
154 | 182 | ||
155 | /** | 183 | /** |
156 | * Desired anonymity level. | 184 | * Increase in traffic preference still to be submitted |
185 | * to the core service for this peer. FIXME: double or 'uint64_t'? | ||
157 | */ | 186 | */ |
158 | uint32_t anonymity_level; | 187 | double inc_preference; |
159 | 188 | ||
160 | /** | 189 | /** |
161 | * Number of results actually stored in the results array. | 190 | * The peer's identity. |
162 | */ | ||
163 | unsigned int results_used; | ||
164 | |||
165 | /** | ||
166 | * Size of the results array in memory. | ||
167 | */ | ||
168 | unsigned int results_size; | ||
169 | |||
170 | /** | ||
171 | * Size (in bytes) of the 'results_bf' bloomfilter. | ||
172 | * | ||
173 | * FIXME: this member is not used, is that OK? If so, it should | ||
174 | * be removed! | ||
175 | */ | 191 | */ |
176 | size_t results_bf_size; | 192 | GNUNET_PEER_Id pid; |
177 | 193 | ||
178 | /** | 194 | /** |
179 | * If the request is for a DBLOCK or IBLOCK, this is the identity of | 195 | * Size of the linked list of 'pending_messages'. |
180 | * the peer that is known to have a response. Set to all-zeros if | ||
181 | * such a target is not known (note that even if OUR anonymity | ||
182 | * level is >0 we may happen to know the responder's identity; | ||
183 | * nevertheless, we should probably not use it for a DHT-lookup | ||
184 | * or similar blunt actions in order to avoid exposing ourselves). | ||
185 | */ | 196 | */ |
186 | struct GNUNET_PeerIdentity target; | 197 | unsigned int pending_requests; |
187 | 198 | ||
188 | /** | 199 | /** |
189 | * If the request is for an SBLOCK, this is the identity of the | 200 | * Which offset in "last_p2p_replies" will be updated next? |
190 | * pseudonym to which the SBLOCK belongs. | 201 | * (we go round-robin). |
191 | */ | 202 | */ |
192 | GNUNET_HashCode namespace; | 203 | unsigned int last_p2p_replies_woff; |
193 | 204 | ||
194 | /** | 205 | /** |
195 | * Hash of the keyword (aka query) for KBLOCKs; Hash of | 206 | * Which offset in "last_client_replies" will be updated next? |
196 | * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query) | 207 | * (we go round-robin). |
197 | * and hash of the identifier XORed with the target for | ||
198 | * SBLOCKS (aka query). | ||
199 | */ | 208 | */ |
200 | GNUNET_HashCode query; | 209 | unsigned int last_client_replies_woff; |
201 | 210 | ||
202 | }; | 211 | }; |
203 | 212 | ||
204 | 213 | ||
205 | /** | 214 | /** |
206 | * Possible routing policies for an FS-GET request. | 215 | * Information we keep for each pending request. We should try to |
216 | * keep this struct as small as possible since its memory consumption | ||
217 | * is key to how many requests we can have pending at once. | ||
207 | */ | 218 | */ |
208 | enum RoutingPolicy | 219 | struct PendingRequest; |
209 | { | ||
210 | /** | ||
211 | * Simply drop the request. | ||
212 | */ | ||
213 | ROUTING_POLICY_NONE = 0, | ||
214 | |||
215 | /** | ||
216 | * Answer it if we can from local datastore. | ||
217 | */ | ||
218 | ROUTING_POLICY_ANSWER = 1, | ||
219 | |||
220 | /** | ||
221 | * Forward the request to other peers (if possible). | ||
222 | */ | ||
223 | ROUTING_POLICY_FORWARD = 2, | ||
224 | |||
225 | /** | ||
226 | * Forward to other peers, and ask them to route | ||
227 | * the response via ourselves. | ||
228 | */ | ||
229 | ROUTING_POLICY_INDIRECT = 6, | ||
230 | |||
231 | /** | ||
232 | * Do everything we could possibly do (that would | ||
233 | * make sense). | ||
234 | */ | ||
235 | ROUTING_POLICY_ALL = 7 | ||
236 | }; | ||
237 | 220 | ||
238 | 221 | ||
239 | /** | 222 | /** |
240 | * Internal context we use for our initial processing | 223 | * Doubly-linked list of requests we are performing |
241 | * of a GET request. | 224 | * on behalf of the same client. |
242 | */ | 225 | */ |
243 | struct ProcessGetContext | 226 | struct ClientRequestList |
244 | { | 227 | { |
228 | |||
245 | /** | 229 | /** |
246 | * The search query (used for datastore lookup). | 230 | * This is a doubly-linked list. |
247 | */ | 231 | */ |
248 | GNUNET_HashCode query; | 232 | struct ClientRequestList *next; |
249 | 233 | ||
250 | /** | 234 | /** |
251 | * Which peer we should forward the response to. | 235 | * This is a doubly-linked list. |
252 | */ | 236 | */ |
253 | struct GNUNET_PeerIdentity reply_to; | 237 | struct ClientRequestList *prev; |
254 | 238 | ||
255 | /** | 239 | /** |
256 | * Namespace for the result (only set for SKS requests) | 240 | * Request this entry represents. |
257 | */ | 241 | */ |
258 | GNUNET_HashCode namespace; | 242 | struct PendingRequest *req; |
259 | 243 | ||
260 | /** | 244 | /** |
261 | * Peer that we should forward the query to if possible | 245 | * Client list this request belongs to. |
262 | * (since that peer likely has the content). | ||
263 | */ | 246 | */ |
264 | struct GNUNET_PeerIdentity prime_target; | 247 | struct ClientList *client_list; |
265 | 248 | ||
249 | }; | ||
250 | |||
251 | |||
252 | /** | ||
253 | * Replies to be transmitted to the client. The actual | ||
254 | * response message is allocated after this struct. | ||
255 | */ | ||
256 | struct ClientResponseMessage | ||
257 | { | ||
266 | /** | 258 | /** |
267 | * When did we receive this request? | 259 | * This is a doubly-linked list. |
268 | */ | 260 | */ |
269 | struct GNUNET_TIME_Absolute start_time; | 261 | struct ClientResponseMessage *next; |
270 | 262 | ||
271 | /** | 263 | /** |
272 | * Our entry in the DRQ (non-NULL while we wait for our | 264 | * This is a doubly-linked list. |
273 | * turn to interact with the local database). | ||
274 | */ | 265 | */ |
275 | struct DatastoreRequestQueue *drq; | 266 | struct ClientResponseMessage *prev; |
276 | 267 | ||
277 | /** | 268 | /** |
278 | * Filter used to eliminate duplicate results. Can be NULL if we | 269 | * Client list entry this response belongs to. |
279 | * are not yet filtering any results. | ||
280 | */ | 270 | */ |
281 | struct GNUNET_CONTAINER_BloomFilter *bf; | 271 | struct ClientList *client_list; |
282 | 272 | ||
283 | /** | 273 | /** |
284 | * Bitmap describing which of the optional | 274 | * Number of bytes in the response. |
285 | * hash codes / peer identities were given to us. | ||
286 | */ | 275 | */ |
287 | uint32_t bm; | 276 | size_t msize; |
277 | }; | ||
278 | |||
288 | 279 | ||
280 | /** | ||
281 | * Linked list of clients we are performing requests | ||
282 | * for right now. | ||
283 | */ | ||
284 | struct ClientList | ||
285 | { | ||
289 | /** | 286 | /** |
290 | * Desired block type. | 287 | * This is a linked list. |
291 | */ | 288 | */ |
292 | uint32_t type; | 289 | struct ClientList *next; |
293 | 290 | ||
294 | /** | 291 | /** |
295 | * Priority of the request. | 292 | * ID of a client making a request, NULL if this entry is for a |
293 | * peer. | ||
296 | */ | 294 | */ |
297 | uint32_t priority; | 295 | struct GNUNET_SERVER_Client *client; |
298 | 296 | ||
299 | /** | 297 | /** |
300 | * Size of the 'bf' (in bytes). | 298 | * Head of list of requests performed on behalf |
299 | * of this client right now. | ||
301 | */ | 300 | */ |
302 | size_t bf_size; | 301 | struct ClientRequestList *rl_head; |
303 | 302 | ||
304 | /** | 303 | /** |
305 | * In what ways are we going to process | 304 | * Tail of list of requests performed on behalf |
306 | * the request? | 305 | * of this client right now. |
307 | */ | 306 | */ |
308 | enum RoutingPolicy policy; | 307 | struct ClientRequestList *rl_tail; |
309 | 308 | ||
310 | /** | 309 | /** |
311 | * Time-to-live for the request (value | 310 | * Head of linked list of responses. |
312 | * we use). | ||
313 | */ | 311 | */ |
314 | int32_t ttl; | 312 | struct ClientResponseMessage *res_head; |
315 | 313 | ||
316 | /** | 314 | /** |
317 | * Number to mingle hashes for bloom-filter | 315 | * Tail of linked list of responses. |
318 | * tests with. | ||
319 | */ | 316 | */ |
320 | int32_t mingle; | 317 | struct ClientResponseMessage *res_tail; |
321 | 318 | ||
322 | /** | 319 | /** |
323 | * Number of results that were found so far. | 320 | * Context for sending replies. |
324 | */ | 321 | */ |
325 | unsigned int results_found; | 322 | struct GNUNET_CONNECTION_TransmitHandle *th; |
323 | |||
326 | }; | 324 | }; |
327 | 325 | ||
328 | 326 | ||
329 | /** | 327 | /** |
330 | * Information we keep for each pending reply. The | 328 | * Hash map entry of requests we are performing |
331 | * actual message follows at the end of this struct. | 329 | * on behalf of the same peer. |
332 | */ | 330 | */ |
333 | struct PendingMessage | 331 | struct PeerRequestEntry |
334 | { | 332 | { |
335 | /** | ||
336 | * This is a linked list. | ||
337 | */ | ||
338 | struct PendingMessage *next; | ||
339 | 333 | ||
340 | /** | 334 | /** |
341 | * Size of the reply; actual reply message follows | 335 | * Request this entry represents. |
342 | * at the end of this struct. | ||
343 | */ | 336 | */ |
344 | size_t msize; | 337 | struct PendingRequest *req; |
345 | 338 | ||
346 | /** | 339 | /** |
347 | * How important is this message for us? | 340 | * Entry of peer responsible for this entry. |
348 | */ | 341 | */ |
349 | uint32_t priority; | 342 | struct ConnectedPeer *cp; |
350 | 343 | ||
351 | }; | 344 | }; |
352 | 345 | ||
353 | 346 | ||
354 | /** | 347 | /** |
355 | * All requests from a client are kept in a doubly-linked list. | 348 | * Doubly-linked list of messages we are performing |
349 | * due to a pending request. | ||
356 | */ | 350 | */ |
357 | struct ClientRequestList; | 351 | struct PendingMessageList |
352 | { | ||
353 | |||
354 | /** | ||
355 | * This is a doubly-linked list of messages on behalf of the same request. | ||
356 | */ | ||
357 | struct PendingMessageList *next; | ||
358 | |||
359 | /** | ||
360 | * This is a doubly-linked list of messages on behalf of the same request. | ||
361 | */ | ||
362 | struct PendingMessageList *prev; | ||
363 | |||
364 | /** | ||
365 | * Message this entry represents. | ||
366 | */ | ||
367 | struct PendingMessage *pm; | ||
368 | |||
369 | /** | ||
370 | * Request this entry belongs to. | ||
371 | */ | ||
372 | struct PendingRequest *req; | ||
373 | |||
374 | /** | ||
375 | * Peer this message is targeted for. | ||
376 | */ | ||
377 | struct ConnectedPeer *target; | ||
378 | |||
379 | }; | ||
358 | 380 | ||
359 | 381 | ||
360 | /** | 382 | /** |
@@ -366,23 +388,23 @@ struct PendingRequest | |||
366 | { | 388 | { |
367 | 389 | ||
368 | /** | 390 | /** |
369 | * ID of a client making a request, NULL if this entry is for a | 391 | * If this request was made by a client, this is our entry in the |
370 | * peer. | 392 | * client request list; otherwise NULL. |
371 | */ | 393 | */ |
372 | struct GNUNET_SERVER_Client *client; | 394 | struct ClientRequestList *client_request_list; |
373 | 395 | ||
374 | /** | 396 | /** |
375 | * If this request was made by a client, | 397 | * If this request was made by a peer, this is our entry in the |
376 | * this is our entry in the client request | 398 | * per-peer multi-hash map; otherwise NULL. |
377 | * list; otherwise NULL. | ||
378 | */ | 399 | */ |
379 | struct ClientRequestList *crl_entry; | 400 | struct PeerRequestEntry *pht_entry; |
380 | 401 | ||
381 | /** | 402 | /** |
382 | * If this is a namespace query, pointer to the hash of the public | 403 | * If this is a namespace query, pointer to the hash of the public |
383 | * key of the namespace; otherwise NULL. | 404 | * key of the namespace; otherwise NULL. Pointer will be to the |
405 | * end of this struct (so no need to free it). | ||
384 | */ | 406 | */ |
385 | GNUNET_HashCode *namespace; | 407 | const GNUNET_HashCode *namespace; |
386 | 408 | ||
387 | /** | 409 | /** |
388 | * Bloomfilter we use to filter out replies that we don't care about | 410 | * Bloomfilter we use to filter out replies that we don't care about |
@@ -396,36 +418,29 @@ struct PendingRequest | |||
396 | struct GNUNET_CORE_InformationRequestContext *irc; | 418 | struct GNUNET_CORE_InformationRequestContext *irc; |
397 | 419 | ||
398 | /** | 420 | /** |
399 | * Handle for an active request for transmission to this peer, or | 421 | * Hash code of all replies that we have seen so far (only valid |
400 | * NULL. Only used for replies that we are trying to send to a peer | 422 | * if client is not NULL since we only track replies like this for |
401 | * that we are not yet connected to. | 423 | * our own clients). |
402 | */ | ||
403 | struct GNUNET_CORE_TransmitHandle *cth; | ||
404 | |||
405 | /** | ||
406 | * Replies that we have received but were unable to forward yet | ||
407 | * (typically non-null only if we have a pending transmission | ||
408 | * request with the client or the respective peer). | ||
409 | */ | 424 | */ |
410 | struct PendingMessage *replies_pending; | 425 | GNUNET_HashCode *replies_seen; |
411 | 426 | ||
412 | /** | 427 | /** |
413 | * Pending transmission request for the target client (for processing of | 428 | * Node in the heap representing this entry; NULL |
414 | * 'replies_pending'). | 429 | * if we have no heap node. |
415 | */ | 430 | */ |
416 | struct GNUNET_CONNECTION_TransmitHandle *th; | 431 | struct GNUNET_CONTAINER_HeapNode *hnode; |
417 | 432 | ||
418 | /** | 433 | /** |
419 | * Hash code of all replies that we have seen so far (only valid | 434 | * Head of list of messages being performed on behalf of this |
420 | * if client is not NULL since we only track replies like this for | 435 | * request. |
421 | * our own clients). | ||
422 | */ | 436 | */ |
423 | GNUNET_HashCode *replies_seen; | 437 | struct PendingMessageList *pending_head; |
424 | 438 | ||
425 | /** | 439 | /** |
426 | * Node in the heap representing this entry. | 440 | * Tail of list of messages being performed on behalf of this |
441 | * request. | ||
427 | */ | 442 | */ |
428 | struct GNUNET_CONTAINER_HeapNode *hnode; | 443 | struct PendingMessageList *pending_tail; |
429 | 444 | ||
430 | /** | 445 | /** |
431 | * When did we first see this request (form this peer), or, if our | 446 | * When did we first see this request (form this peer), or, if our |
@@ -445,12 +460,6 @@ struct PendingRequest | |||
445 | GNUNET_SCHEDULER_TaskIdentifier task; | 460 | GNUNET_SCHEDULER_TaskIdentifier task; |
446 | 461 | ||
447 | /** | 462 | /** |
448 | * (Interned) Peer identifier (only valid if "client" is NULL) | ||
449 | * that identifies a peer that gave us this request. | ||
450 | */ | ||
451 | GNUNET_PEER_Id source_pid; | ||
452 | |||
453 | /** | ||
454 | * (Interned) Peer identifier that identifies a preferred target | 463 | * (Interned) Peer identifier that identifies a preferred target |
455 | * for requests. | 464 | * for requests. |
456 | */ | 465 | */ |
@@ -461,6 +470,12 @@ struct PendingRequest | |||
461 | * received our query for this content. | 470 | * received our query for this content. |
462 | */ | 471 | */ |
463 | GNUNET_PEER_Id *used_pids; | 472 | GNUNET_PEER_Id *used_pids; |
473 | |||
474 | /** | ||
475 | * Our entry in the DRQ (non-NULL while we wait for our | ||
476 | * turn to interact with the local database). | ||
477 | */ | ||
478 | struct DatastoreRequestQueue *drq; | ||
464 | 479 | ||
465 | /** | 480 | /** |
466 | * Size of the 'bf' (in bytes). | 481 | * Size of the 'bf' (in bytes). |
@@ -483,6 +498,11 @@ struct PendingRequest | |||
483 | unsigned int used_pids_size; | 498 | unsigned int used_pids_size; |
484 | 499 | ||
485 | /** | 500 | /** |
501 | * Number of results found for this request. | ||
502 | */ | ||
503 | unsigned int results_found; | ||
504 | |||
505 | /** | ||
486 | * How many entries in "replies_seen" are actually valid? | 506 | * How many entries in "replies_seen" are actually valid? |
487 | */ | 507 | */ |
488 | unsigned int replies_seen_off; | 508 | unsigned int replies_seen_off; |
@@ -527,422 +547,529 @@ struct PendingRequest | |||
527 | 547 | ||
528 | 548 | ||
529 | /** | 549 | /** |
530 | * All requests from a client are kept in a doubly-linked list. | 550 | * Our scheduler. |
531 | */ | 551 | */ |
532 | struct ClientRequestList | 552 | static struct GNUNET_SCHEDULER_Handle *sched; |
533 | { | ||
534 | /** | ||
535 | * This is a doubly-linked list. | ||
536 | */ | ||
537 | struct ClientRequestList *next; | ||
538 | |||
539 | /** | ||
540 | * This is a doubly-linked list. | ||
541 | */ | ||
542 | struct ClientRequestList *prev; | ||
543 | |||
544 | /** | ||
545 | * A request from this client. | ||
546 | */ | ||
547 | struct PendingRequest *req; | ||
548 | |||
549 | /** | ||
550 | * Client list with the head and tail of this DLL. | ||
551 | */ | ||
552 | struct ClientList *cl; | ||
553 | }; | ||
554 | |||
555 | 553 | ||
556 | /** | 554 | /** |
557 | * Linked list of all clients that we are currently processing | 555 | * Our configuration. |
558 | * requests for. | ||
559 | */ | 556 | */ |
560 | struct ClientList | 557 | const struct GNUNET_CONFIGURATION_Handle *cfg; |
561 | { | ||
562 | |||
563 | /** | ||
564 | * This is a linked list. | ||
565 | */ | ||
566 | struct ClientList *next; | ||
567 | |||
568 | /** | ||
569 | * What client is this entry for? | ||
570 | */ | ||
571 | struct GNUNET_SERVER_Client* client; | ||
572 | |||
573 | /** | ||
574 | * Head of the DLL of requests from this client. | ||
575 | */ | ||
576 | struct ClientRequestList *head; | ||
577 | |||
578 | /** | ||
579 | * Tail of the DLL of requests from this client. | ||
580 | */ | ||
581 | struct ClientRequestList *tail; | ||
582 | |||
583 | }; | ||
584 | |||
585 | 558 | ||
586 | /** | 559 | /** |
587 | * Closure for "process_reply" function. | 560 | * Map of peer identifiers to "struct ConnectedPeer" (for that peer). |
588 | */ | 561 | */ |
589 | struct ProcessReplyClosure | 562 | static struct GNUNET_CONTAINER_MultiHashMap *connected_peers; |
590 | { | ||
591 | /** | ||
592 | * The data for the reply. | ||
593 | */ | ||
594 | const void *data; | ||
595 | |||
596 | /** | ||
597 | * When the reply expires. | ||
598 | */ | ||
599 | struct GNUNET_TIME_Absolute expiration; | ||
600 | |||
601 | /** | ||
602 | * Size of data. | ||
603 | */ | ||
604 | size_t size; | ||
605 | |||
606 | /** | ||
607 | * Namespace that this reply belongs to | ||
608 | * (if it is of type SBLOCK). | ||
609 | */ | ||
610 | GNUNET_HashCode namespace; | ||
611 | |||
612 | /** | ||
613 | * Type of the block. | ||
614 | */ | ||
615 | uint32_t type; | ||
616 | |||
617 | /** | ||
618 | * How much was this reply worth to us? | ||
619 | */ | ||
620 | uint32_t priority; | ||
621 | }; | ||
622 | |||
623 | 563 | ||
624 | /** | 564 | /** |
625 | * Information about a peer that we are connected to. | 565 | * Map of peer identifiers to "struct PendingRequest" (for that peer). |
626 | * We track data that is useful for determining which | ||
627 | * peers should receive our requests. | ||
628 | */ | 566 | */ |
629 | struct ConnectedPeer | 567 | static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map; |
630 | { | ||
631 | |||
632 | /** | ||
633 | * List of the last clients for which this peer | ||
634 | * successfully answered a query. | ||
635 | */ | ||
636 | struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE]; | ||
637 | |||
638 | /** | ||
639 | * List of the last PIDs for which | ||
640 | * this peer successfully answered a query; | ||
641 | * We use 0 to indicate no successful reply. | ||
642 | */ | ||
643 | GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE]; | ||
644 | |||
645 | /** | ||
646 | * Average delay between sending the peer a request and | ||
647 | * getting a reply (only calculated over the requests for | ||
648 | * which we actually got a reply). Calculated | ||
649 | * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n | ||
650 | */ | ||
651 | struct GNUNET_TIME_Relative avg_delay; | ||
652 | |||
653 | /** | ||
654 | * Handle for an active request for transmission to this | ||
655 | * peer, or NULL. | ||
656 | */ | ||
657 | struct GNUNET_CORE_TransmitHandle *cth; | ||
658 | |||
659 | /** | ||
660 | * Messages (replies, queries, content migration) we would like to | ||
661 | * send to this peer in the near future. Sorted by priority. | ||
662 | */ | ||
663 | struct PendingMessage *pending_messages; | ||
664 | |||
665 | /** | ||
666 | * Average priority of successful replies. Calculated | ||
667 | * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n | ||
668 | */ | ||
669 | double avg_priority; | ||
670 | |||
671 | /** | ||
672 | * The peer's identity. | ||
673 | */ | ||
674 | GNUNET_PEER_Id pid; | ||
675 | |||
676 | /** | ||
677 | * Number of requests we have currently pending with this peer (that | ||
678 | * is, requests that were transmitted so recently that we would not | ||
679 | * retransmit them right now). | ||
680 | */ | ||
681 | unsigned int pending_requests; | ||
682 | |||
683 | /** | ||
684 | * Which offset in "last_p2p_replies" will be updated next? | ||
685 | * (we go round-robin). | ||
686 | */ | ||
687 | unsigned int last_p2p_replies_woff; | ||
688 | |||
689 | /** | ||
690 | * Which offset in "last_client_replies" will be updated next? | ||
691 | * (we go round-robin). | ||
692 | */ | ||
693 | unsigned int last_client_replies_woff; | ||
694 | |||
695 | }; | ||
696 | |||
697 | 568 | ||
698 | /** | 569 | /** |
699 | * Our connection to the datastore. | 570 | * Map of query identifiers to "struct PendingRequest" (for that query). |
700 | */ | 571 | */ |
701 | static struct GNUNET_DATASTORE_Handle *dsh; | 572 | static struct GNUNET_CONTAINER_MultiHashMap *query_request_map; |
702 | 573 | ||
703 | /** | 574 | /** |
704 | * Our scheduler. | 575 | * Heap with the request that will expire next at the top. Contains |
576 | * pointers of type "struct PendingRequest*"; these will *also* be | ||
577 | * aliased from the "requests_by_peer" data structures and the | ||
578 | * "requests_by_query" table. Note that requests from our clients | ||
579 | * don't expire and are thus NOT in the "requests_by_expiration" | ||
580 | * (or the "requests_by_peer" tables). | ||
705 | */ | 581 | */ |
706 | static struct GNUNET_SCHEDULER_Handle *sched; | 582 | static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap; |
707 | 583 | ||
708 | /** | 584 | /** |
709 | * Our configuration. | 585 | * Linked list of clients we are currently processing requests for. |
710 | */ | 586 | */ |
711 | const struct GNUNET_CONFIGURATION_Handle *cfg; | 587 | struct ClientList *client_list; |
712 | 588 | ||
713 | /** | 589 | /** |
714 | * Handle to the core service (NULL until we've connected to it). | 590 | * Pointer to handle to the core service (points to NULL until we've |
591 | * connected to it). | ||
715 | */ | 592 | */ |
716 | struct GNUNET_CORE_Handle *core; | 593 | struct GNUNET_CORE_Handle *core; |
717 | 594 | ||
718 | /** | ||
719 | * Head of doubly-linked LGC list. | ||
720 | */ | ||
721 | static struct LocalGetContext *lgc_head; | ||
722 | 595 | ||
723 | /** | 596 | /* ******************* clean up functions ************************ */ |
724 | * Tail of doubly-linked LGC list. | ||
725 | */ | ||
726 | static struct LocalGetContext *lgc_tail; | ||
727 | 597 | ||
728 | /** | ||
729 | * Head of request queue for the datastore, sorted by timeout. | ||
730 | */ | ||
731 | static struct DatastoreRequestQueue *drq_head; | ||
732 | 598 | ||
733 | /** | 599 | /** |
734 | * Tail of request queue for the datastore. | 600 | * We're done with a particular message list entry. |
601 | * Free all associated resources. | ||
602 | * | ||
603 | * @param pml entry to destroy | ||
735 | */ | 604 | */ |
736 | static struct DatastoreRequestQueue *drq_tail; | 605 | static void |
606 | destroy_pending_message_list_entry (struct PendingMessageList *pml) | ||
607 | { | ||
608 | GNUNET_CONTAINER_DLL_remove (pml->req->pending_head, | ||
609 | pml->req->pending_tail, | ||
610 | pml); | ||
611 | GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head, | ||
612 | pml->target->pending_messages_tail, | ||
613 | pml->pm); | ||
614 | pml->target->pending_requests--; | ||
615 | GNUNET_free (pml->pm); | ||
616 | GNUNET_free (pml); | ||
617 | } | ||
737 | 618 | ||
738 | /** | ||
739 | * Map of query hash codes to requests. | ||
740 | */ | ||
741 | static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query; | ||
742 | 619 | ||
743 | /** | 620 | /** |
744 | * Map of peer IDs to requests (for those requests coming | 621 | * Destroy the given pending message (and call the respective |
745 | * from other peers). | 622 | * continuation). |
623 | * | ||
624 | * @param pm message to destroy | ||
625 | * @param tpid id of peer that the message was delivered to, or 0 for none | ||
746 | */ | 626 | */ |
747 | static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer; | 627 | static void |
628 | destroy_pending_message (struct PendingMessage *pm, | ||
629 | GNUNET_PEER_Id tpid) | ||
630 | { | ||
631 | struct PendingMessageList *pml = pm->pml; | ||
632 | |||
633 | GNUNET_assert (pml->pm == pm); | ||
634 | GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) ); | ||
635 | pm->cont (pm->cont_cls, 0); | ||
636 | destroy_pending_message_list_entry (pml); | ||
637 | } | ||
638 | |||
748 | 639 | ||
749 | /** | ||
750 | * Linked list of all of our clients and their requests. | ||
751 | */ | ||
752 | static struct ClientList *clients; | ||
753 | 640 | ||
754 | /** | 641 | /** |
755 | * Heap with the request that will expire next at the top. Contains | 642 | * We're done processing a particular request. |
756 | * pointers of type "struct PendingRequest*"; these will *also* be | 643 | * Free all associated resources. |
757 | * aliased from the "requests_by_peer" data structures and the | 644 | * |
758 | * "requests_by_query" table. Note that requests from our clients | 645 | * @param pr request to destroy |
759 | * don't expire and are thus NOT in the "requests_by_expiration" | ||
760 | * (or the "requests_by_peer" tables). | ||
761 | */ | 646 | */ |
762 | static struct GNUNET_CONTAINER_Heap *requests_by_expiration; | 647 | static void |
648 | destroy_pending_request (struct PendingRequest *pr) | ||
649 | { | ||
650 | struct GNUNET_PeerIdentity pid; | ||
651 | |||
652 | if (pr->hnode != NULL) | ||
653 | { | ||
654 | GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap, | ||
655 | pr->hnode); | ||
656 | pr->hnode = NULL; | ||
657 | } | ||
658 | /* might have already been removed from map | ||
659 | in 'process_reply' if there was a unique | ||
660 | reply; hence ignore the return value here */ | ||
661 | (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map, | ||
662 | &pr->query, | ||
663 | pr); | ||
664 | if (pr->drq != NULL) | ||
665 | { | ||
666 | GNUNET_FS_drq_get_cancel (pr->drq); | ||
667 | pr->drq = NULL; | ||
668 | } | ||
669 | if (pr->client_request_list != NULL) | ||
670 | { | ||
671 | GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head, | ||
672 | pr->client_request_list->client_list->rl_tail, | ||
673 | pr->client_request_list); | ||
674 | GNUNET_free (pr->client_request_list); | ||
675 | pr->client_request_list = NULL; | ||
676 | } | ||
677 | if (pr->pht_entry != NULL) | ||
678 | { | ||
679 | GNUNET_PEER_resolve (pr->pht_entry->cp->pid, | ||
680 | &pid); | ||
681 | GNUNET_CONTAINER_multihashmap_remove (peer_request_map, | ||
682 | &pid.hashPubKey, | ||
683 | pr->pht_entry); | ||
684 | GNUNET_free (pr->pht_entry); | ||
685 | pr->pht_entry = NULL; | ||
686 | } | ||
687 | if (pr->bf != NULL) | ||
688 | { | ||
689 | GNUNET_CONTAINER_bloomfilter_free (pr->bf); | ||
690 | pr->bf = NULL; | ||
691 | } | ||
692 | if (pr->irc != NULL) | ||
693 | { | ||
694 | GNUNET_CORE_peer_change_preference_cancel (pr->irc); | ||
695 | pr->irc = NULL; | ||
696 | } | ||
697 | if (pr->replies_seen != NULL) | ||
698 | { | ||
699 | GNUNET_free (pr->replies_seen); | ||
700 | pr->replies_seen = NULL; | ||
701 | } | ||
702 | if (pr->task != GNUNET_SCHEDULER_NO_TASK) | ||
703 | { | ||
704 | GNUNET_SCHEDULER_cancel (sched, | ||
705 | pr->task); | ||
706 | pr->task = GNUNET_SCHEDULER_NO_TASK; | ||
707 | } | ||
708 | while (NULL != pr->pending_head) | ||
709 | destroy_pending_message_list_entry (pr->pending_head); | ||
710 | GNUNET_PEER_change_rc (pr->target_pid, -1); | ||
711 | if (pr->used_pids != NULL) | ||
712 | { | ||
713 | GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off); | ||
714 | GNUNET_free (pr->used_pids); | ||
715 | pr->used_pids_off = 0; | ||
716 | pr->used_pids_size = 0; | ||
717 | pr->used_pids = NULL; | ||
718 | } | ||
719 | GNUNET_free (pr); | ||
720 | } | ||
721 | |||
763 | 722 | ||
764 | /** | 723 | /** |
765 | * Map of peer identifiers to "struct ConnectedPeer" (for that peer). | 724 | * Method called whenever a given peer connects. |
725 | * | ||
726 | * @param cls closure, not used | ||
727 | * @param peer peer identity this notification is about | ||
728 | * @param latency reported latency of the connection with 'other' | ||
729 | * @param distance reported distance (DV) to 'other' | ||
766 | */ | 730 | */ |
767 | static struct GNUNET_CONTAINER_MultiHashMap *connected_peers; | 731 | static void |
732 | peer_connect_handler (void *cls, | ||
733 | const struct | ||
734 | GNUNET_PeerIdentity * peer, | ||
735 | struct GNUNET_TIME_Relative latency, | ||
736 | uint32_t distance) | ||
737 | { | ||
738 | struct ConnectedPeer *cp; | ||
739 | |||
740 | cp = GNUNET_malloc (sizeof (struct ConnectedPeer)); | ||
741 | cp->pid = GNUNET_PEER_intern (peer); | ||
742 | GNUNET_CONTAINER_multihashmap_put (connected_peers, | ||
743 | &peer->hashPubKey, | ||
744 | cp, | ||
745 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
746 | } | ||
747 | |||
768 | 748 | ||
769 | /** | 749 | /** |
770 | * Maximum number of requests (from other peers) that we're | 750 | * Free (each) request made by the peer. |
771 | * willing to have pending at any given point in time. | 751 | * |
772 | * FIXME: set from configuration (and 32 is a tiny value for testing only). | 752 | * @param cls closure, points to peer that the request belongs to |
753 | * @param key current key code | ||
754 | * @param value value in the hash map | ||
755 | * @return GNUNET_YES (we should continue to iterate) | ||
773 | */ | 756 | */ |
774 | static uint64_t max_pending_requests = 32; | 757 | static int |
775 | 758 | destroy_request (void *cls, | |
759 | const GNUNET_HashCode * key, | ||
760 | void *value) | ||
761 | { | ||
762 | const struct GNUNET_PeerIdentity * peer = cls; | ||
763 | struct PendingRequest *pr = value; | ||
764 | |||
765 | GNUNET_CONTAINER_multihashmap_remove (peer_request_map, | ||
766 | &peer->hashPubKey, | ||
767 | pr); | ||
768 | destroy_pending_request (pr); | ||
769 | return GNUNET_YES; | ||
770 | } | ||
776 | 771 | ||
777 | 772 | ||
778 | /** | 773 | /** |
779 | * Run the next DS request in our | 774 | * Method called whenever a peer disconnects. |
780 | * queue, we're done with the current one. | 775 | * |
776 | * @param cls closure, not used | ||
777 | * @param peer peer identity this notification is about | ||
781 | */ | 778 | */ |
782 | static void | 779 | static void |
783 | next_ds_request () | 780 | peer_disconnect_handler (void *cls, |
781 | const struct | ||
782 | GNUNET_PeerIdentity * peer) | ||
784 | { | 783 | { |
785 | struct DatastoreRequestQueue *e; | 784 | struct ConnectedPeer *cp; |
786 | 785 | struct PendingMessage *pm; | |
787 | while (NULL != (e = drq_head)) | 786 | unsigned int i; |
787 | |||
788 | GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map, | ||
789 | &peer->hashPubKey, | ||
790 | &destroy_request, | ||
791 | (void*) peer); | ||
792 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
793 | &peer->hashPubKey); | ||
794 | if (cp == NULL) | ||
795 | return; | ||
796 | for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++) | ||
788 | { | 797 | { |
789 | if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value) | 798 | if (NULL != cp->last_client_replies[i]) |
790 | break; | 799 | { |
791 | if (e->task != GNUNET_SCHEDULER_NO_TASK) | 800 | GNUNET_SERVER_client_drop (cp->last_client_replies[i]); |
792 | GNUNET_SCHEDULER_cancel (sched, e->task); | 801 | cp->last_client_replies[i] = NULL; |
793 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); | 802 | } |
794 | e->req (e->req_cls, GNUNET_NO); | ||
795 | GNUNET_free (e); | ||
796 | } | 803 | } |
797 | if (e == NULL) | 804 | GNUNET_CONTAINER_multihashmap_remove (connected_peers, |
798 | return; | 805 | &peer->hashPubKey, |
799 | if (e->task != GNUNET_SCHEDULER_NO_TASK) | 806 | cp); |
800 | GNUNET_SCHEDULER_cancel (sched, e->task); | 807 | GNUNET_PEER_change_rc (cp->pid, -1); |
801 | e->task = GNUNET_SCHEDULER_NO_TASK; | 808 | GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); |
802 | e->req (e->req_cls, GNUNET_YES); | 809 | if (NULL != cp->cth) |
803 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); | 810 | GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); |
804 | GNUNET_free (e); | 811 | while (NULL != (pm = cp->pending_messages_head)) |
812 | destroy_pending_message (pm, 0 /* delivery failed */); | ||
813 | GNUNET_break (0 == cp->pending_requests); | ||
814 | GNUNET_free (cp); | ||
805 | } | 815 | } |
806 | 816 | ||
807 | 817 | ||
808 | /** | 818 | /** |
809 | * A datastore request had to be timed out. | 819 | * Iterator over hash map entries that removes all occurences |
820 | * of the given 'client' from the 'last_client_replies' of the | ||
821 | * given connected peer. | ||
810 | * | 822 | * |
811 | * @param cls closure (of type "struct DatastoreRequestQueue*") | 823 | * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove |
812 | * @param tc task context, unused | 824 | * @param key current key code (unused) |
825 | * @param value value in the hash map (the 'struct ConnectedPeer*' to change) | ||
826 | * @return GNUNET_YES (we should continue to iterate) | ||
813 | */ | 827 | */ |
814 | static void | 828 | static int |
815 | timeout_ds_request (void *cls, | 829 | remove_client_from_last_client_replies (void *cls, |
816 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 830 | const GNUNET_HashCode * key, |
831 | void *value) | ||
817 | { | 832 | { |
818 | struct DatastoreRequestQueue *e = cls; | 833 | struct GNUNET_SERVER_Client *client = cls; |
834 | struct ConnectedPeer *cp = value; | ||
835 | unsigned int i; | ||
819 | 836 | ||
820 | e->task = GNUNET_SCHEDULER_NO_TASK; | 837 | for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++) |
821 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); | 838 | { |
822 | e->req (e->req_cls, GNUNET_NO); | 839 | if (cp->last_client_replies[i] == client) |
823 | GNUNET_free (e); | 840 | { |
841 | GNUNET_SERVER_client_drop (cp->last_client_replies[i]); | ||
842 | cp->last_client_replies[i] = NULL; | ||
843 | } | ||
844 | } | ||
845 | return GNUNET_YES; | ||
824 | } | 846 | } |
825 | 847 | ||
826 | 848 | ||
827 | /** | 849 | /** |
828 | * Queue a request for the datastore. | 850 | * A client disconnected. Remove all of its pending queries. |
829 | * | 851 | * |
830 | * @param deadline by when the request should run | 852 | * @param cls closure, NULL |
831 | * @param fun function to call once the request can be run | 853 | * @param client identification of the client |
832 | * @param fun_cls closure for fun | ||
833 | */ | 854 | */ |
834 | static struct DatastoreRequestQueue * | 855 | static void |
835 | queue_ds_request (struct GNUNET_TIME_Relative deadline, | 856 | handle_client_disconnect (void *cls, |
836 | RequestFunction fun, | 857 | struct GNUNET_SERVER_Client |
837 | void *fun_cls) | 858 | * client) |
838 | { | 859 | { |
839 | struct DatastoreRequestQueue *e; | 860 | struct ClientList *pos; |
840 | struct DatastoreRequestQueue *bef; | 861 | struct ClientList *prev; |
862 | struct ClientRequestList *rcl; | ||
863 | struct ClientResponseMessage *creply; | ||
841 | 864 | ||
842 | if (drq_head == NULL) | 865 | if (client == NULL) |
843 | { | 866 | return; /* huh? is this allowed? */ |
844 | /* no other requests pending, run immediately */ | 867 | prev = NULL; |
845 | fun (fun_cls, GNUNET_OK); | 868 | pos = client_list; |
846 | return NULL; | 869 | while ( (NULL != pos) && |
847 | } | 870 | (pos->client != client) ) |
848 | e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue)); | ||
849 | e->timeout = GNUNET_TIME_relative_to_absolute (deadline); | ||
850 | e->req = fun; | ||
851 | e->req_cls = fun_cls; | ||
852 | if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) | ||
853 | { | 871 | { |
854 | /* local request, highest prio, put at head of queue | 872 | prev = pos; |
855 | regardless of deadline */ | 873 | pos = pos->next; |
856 | bef = NULL; | ||
857 | } | 874 | } |
875 | if (pos == NULL) | ||
876 | return; /* no requests pending for this client */ | ||
877 | while (NULL != (rcl = pos->rl_head)) | ||
878 | destroy_pending_request (rcl->req); | ||
879 | if (prev == NULL) | ||
880 | client_list = pos->next; | ||
858 | else | 881 | else |
882 | prev->next = pos->next; | ||
883 | if (pos->th != NULL) | ||
859 | { | 884 | { |
860 | bef = drq_tail; | 885 | GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th); |
861 | while ( (NULL != bef) && | 886 | pos->th = NULL; |
862 | (e->timeout.value < bef->timeout.value) ) | ||
863 | bef = bef->prev; | ||
864 | } | 887 | } |
865 | GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e); | 888 | while (NULL != (creply = pos->res_head)) |
866 | if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) | 889 | { |
867 | return e; | 890 | GNUNET_CONTAINER_DLL_remove (pos->res_head, |
868 | e->task = GNUNET_SCHEDULER_add_delayed (sched, | 891 | pos->res_tail, |
869 | deadline, | 892 | creply); |
870 | &timeout_ds_request, | 893 | GNUNET_free (creply); |
871 | e); | 894 | } |
872 | return e; | 895 | GNUNET_SERVER_client_drop (pos->client); |
896 | GNUNET_free (pos); | ||
897 | GNUNET_CONTAINER_multihashmap_iterate (connected_peers, | ||
898 | &remove_client_from_last_client_replies, | ||
899 | client); | ||
873 | } | 900 | } |
874 | 901 | ||
875 | 902 | ||
876 | /** | 903 | /** |
877 | * Free the state associated with a local get context. | 904 | * Iterator to free peer entries. |
878 | * | 905 | * |
879 | * @param lgc the lgc to free | 906 | * @param cls closure, unused |
907 | * @param key current key code | ||
908 | * @param value value in the hash map (peer entry) | ||
909 | * @return GNUNET_YES (we should continue to iterate) | ||
910 | */ | ||
911 | static int | ||
912 | clean_peer (void *cls, | ||
913 | const GNUNET_HashCode * key, | ||
914 | void *value) | ||
915 | { | ||
916 | peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key); | ||
917 | return GNUNET_YES; | ||
918 | } | ||
919 | |||
920 | |||
921 | /** | ||
922 | * Task run during shutdown. | ||
923 | * | ||
924 | * @param cls unused | ||
925 | * @param tc unused | ||
880 | */ | 926 | */ |
881 | static void | 927 | static void |
882 | local_get_context_free (struct LocalGetContext *lgc) | 928 | shutdown_task (void *cls, |
929 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
883 | { | 930 | { |
884 | GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc); | 931 | GNUNET_CONTAINER_multihashmap_iterate (connected_peers, |
885 | GNUNET_SERVER_client_drop (lgc->client); | 932 | &clean_peer, |
886 | GNUNET_free_non_null (lgc->results); | 933 | NULL); |
887 | if (lgc->results_bf != NULL) | 934 | GNUNET_CONTAINER_multihashmap_destroy (connected_peers); |
888 | GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf); | 935 | connected_peers = NULL; |
889 | if (lgc->req != NULL) | 936 | while (client_list != NULL) |
890 | { | 937 | handle_client_disconnect (NULL, |
891 | if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK) | 938 | client_list->client); |
892 | GNUNET_SCHEDULER_cancel (sched, lgc->req->task); | 939 | GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap)); |
893 | GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc); | 940 | GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap); |
894 | GNUNET_free (lgc->req); | 941 | requests_by_expiration_heap = 0; |
895 | } | 942 | GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map)); |
896 | GNUNET_free (lgc); | 943 | GNUNET_CONTAINER_multihashmap_destroy (query_request_map); |
944 | query_request_map = NULL; | ||
945 | GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map)); | ||
946 | GNUNET_CONTAINER_multihashmap_destroy (peer_request_map); | ||
947 | peer_request_map = NULL; | ||
948 | GNUNET_assert (NULL != core); | ||
949 | GNUNET_CORE_disconnect (core); | ||
950 | core = NULL; | ||
951 | sched = NULL; | ||
952 | cfg = NULL; | ||
897 | } | 953 | } |
898 | 954 | ||
899 | 955 | ||
956 | /* ******************* Utility functions ******************** */ | ||
957 | |||
958 | |||
900 | /** | 959 | /** |
901 | * We're able to transmit the next (local) result to the client. | 960 | * Transmit the given message by copying it to the target buffer |
902 | * Do it and ask the datastore for more. Or, on error, tell | 961 | * "buf". "buf" will be NULL and "size" zero if the socket was closed |
903 | * the datastore to stop giving us more. | 962 | * for writing in the meantime. In that case, do nothing |
963 | * (the disconnect or shutdown handler will take care of the rest). | ||
964 | * If we were able to transmit messages and there are still more | ||
965 | * pending, ask core again for further calls to this function. | ||
904 | * | 966 | * |
905 | * @param cls our closure (struct LocalGetContext) | 967 | * @param cls closure, pointer to the 'struct ConnectedPeer*' |
906 | * @param max maximum number of bytes we can transmit | 968 | * @param size number of bytes available in buf |
907 | * @param buf where to copy our message | 969 | * @param buf where the callee should write the message |
908 | * @return number of bytes copied to buf | 970 | * @return number of bytes written to buf |
909 | */ | 971 | */ |
910 | static size_t | 972 | static size_t |
911 | transmit_local_result (void *cls, | 973 | transmit_to_peer (void *cls, |
912 | size_t max, | 974 | size_t size, void *buf) |
913 | void *buf) | ||
914 | { | 975 | { |
915 | struct LocalGetContext *lgc = cls; | 976 | struct ConnectedPeer *cp = cls; |
916 | uint16_t msize; | 977 | char *cbuf = buf; |
917 | 978 | struct GNUNET_PeerIdentity pid; | |
979 | struct PendingMessage *pm; | ||
980 | size_t msize; | ||
981 | |||
982 | cp->cth = NULL; | ||
918 | if (NULL == buf) | 983 | if (NULL == buf) |
919 | { | 984 | { |
920 | #if DEBUG_FS | 985 | #if DEBUG_FS |
921 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 986 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
922 | "Failed to transmit result to local client, aborting datastore iteration.\n"); | 987 | "Dropping reply, core too busy.\n"); |
923 | #endif | 988 | #endif |
924 | /* error, abort! */ | ||
925 | GNUNET_free (lgc->result); | ||
926 | lgc->result = NULL; | ||
927 | GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); | ||
928 | return 0; | 989 | return 0; |
929 | } | 990 | } |
930 | msize = ntohs (lgc->result->header.size); | 991 | msize = 0; |
931 | #if DEBUG_FS | 992 | while ( (NULL != (pm = cp->pending_messages_head) ) && |
932 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 993 | (pm->msize <= size) ) |
933 | "Transmitting %u bytes of result to local client.\n", | 994 | { |
934 | msize); | 995 | memcpy (&cbuf[msize], &pm[1], pm->msize); |
935 | #endif | 996 | msize += pm->msize; |
936 | GNUNET_assert (max >= msize); | 997 | size -= pm->msize; |
937 | memcpy (buf, lgc->result, msize); | 998 | destroy_pending_message (pm, cp->pid); |
938 | GNUNET_free (lgc->result); | 999 | } |
939 | lgc->result = NULL; | 1000 | if (NULL != pm) |
940 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | 1001 | { |
1002 | GNUNET_PEER_resolve (cp->pid, | ||
1003 | &pid); | ||
1004 | cp->cth = GNUNET_CORE_notify_transmit_ready (core, | ||
1005 | pm->priority, | ||
1006 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
1007 | &pid, | ||
1008 | pm->msize, | ||
1009 | &transmit_to_peer, | ||
1010 | pm); | ||
1011 | } | ||
941 | return msize; | 1012 | return msize; |
942 | } | 1013 | } |
943 | 1014 | ||
944 | 1015 | ||
945 | /** | 1016 | /** |
1017 | * Add a message to the set of pending messages for the given peer. | ||
1018 | * | ||
1019 | * @param cp peer to send message to | ||
1020 | * @param pm message to queue | ||
1021 | * @param pr request on which behalf this message is being queued | ||
1022 | */ | ||
1023 | static void | ||
1024 | add_to_pending_messages_for_peer (struct ConnectedPeer *cp, | ||
1025 | struct PendingMessage *pm, | ||
1026 | struct PendingRequest *pr) | ||
1027 | { | ||
1028 | struct PendingMessage *pos; | ||
1029 | struct PendingMessageList *pml; | ||
1030 | struct GNUNET_PeerIdentity pid; | ||
1031 | |||
1032 | GNUNET_assert (pm->next == NULL); | ||
1033 | GNUNET_assert (pm->pml == NULL); | ||
1034 | pml = GNUNET_malloc (sizeof (struct PendingMessageList)); | ||
1035 | pml->req = pr; | ||
1036 | pml->target = cp; | ||
1037 | pml->pm = pm; | ||
1038 | pm->pml = pml; | ||
1039 | GNUNET_CONTAINER_DLL_insert (pr->pending_head, | ||
1040 | pr->pending_tail, | ||
1041 | pml); | ||
1042 | pos = cp->pending_messages_head; | ||
1043 | while ( (pos != NULL) && | ||
1044 | (pm->priority < pos->priority) ) | ||
1045 | pos = pos->next; | ||
1046 | GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head, | ||
1047 | cp->pending_messages_tail, | ||
1048 | pos, | ||
1049 | pm); | ||
1050 | cp->pending_requests++; | ||
1051 | if (cp->pending_requests > MAX_QUEUE_PER_PEER) | ||
1052 | destroy_pending_message (cp->pending_messages_tail, 0); | ||
1053 | if (cp->cth == NULL) | ||
1054 | { | ||
1055 | /* need to schedule transmission */ | ||
1056 | GNUNET_PEER_resolve (cp->pid, &pid); | ||
1057 | cp->cth = GNUNET_CORE_notify_transmit_ready (core, | ||
1058 | cp->pending_messages_head->priority, | ||
1059 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1060 | &pid, | ||
1061 | cp->pending_messages_head->msize, | ||
1062 | &transmit_to_peer, | ||
1063 | cp); | ||
1064 | } | ||
1065 | if (cp->cth == NULL) | ||
1066 | { | ||
1067 | /* FIXME: call stats (rare, bad case) */ | ||
1068 | } | ||
1069 | } | ||
1070 | |||
1071 | |||
1072 | /** | ||
946 | * Mingle hash with the mingle_number to produce different bits. | 1073 | * Mingle hash with the mingle_number to produce different bits. |
947 | */ | 1074 | */ |
948 | static void | 1075 | static void |
@@ -960,6 +1087,48 @@ mingle_hash (const GNUNET_HashCode * in, | |||
960 | 1087 | ||
961 | 1088 | ||
962 | /** | 1089 | /** |
1090 | * Test if the load on this peer is too high | ||
1091 | * to even consider processing the query at | ||
1092 | * all. | ||
1093 | * | ||
1094 | * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise | ||
1095 | */ | ||
1096 | static int | ||
1097 | test_load_too_high () | ||
1098 | { | ||
1099 | return GNUNET_NO; // FIXME | ||
1100 | } | ||
1101 | |||
1102 | |||
1103 | /* ******************* Pending Request Refresh Task ******************** */ | ||
1104 | |||
1105 | |||
1106 | /** | ||
1107 | * Function called after we either failed or succeeded | ||
1108 | * at transmitting a query to a peer. | ||
1109 | * | ||
1110 | * @param cls the requests "struct PendingRequest*" | ||
1111 | * @param pid ID of receiving peer, 0 on transmission error | ||
1112 | */ | ||
1113 | static void | ||
1114 | transmit_query_continuation (void *cls, | ||
1115 | GNUNET_PEER_Id tpid) | ||
1116 | { | ||
1117 | struct PendingRequest *pr = cls; | ||
1118 | |||
1119 | if (tpid == 0) | ||
1120 | return; | ||
1121 | GNUNET_PEER_change_rc (tpid, 1); | ||
1122 | if (pr->used_pids_off == pr->used_pids_size) | ||
1123 | GNUNET_array_grow (pr->used_pids, | ||
1124 | pr->used_pids_size, | ||
1125 | pr->used_pids_size * 2 + 2); | ||
1126 | pr->used_pids[pr->used_pids_off++] = tpid; | ||
1127 | } | ||
1128 | |||
1129 | |||
1130 | #if 0 | ||
1131 | /** | ||
963 | * How many bytes should a bloomfilter be if we have already seen | 1132 | * How many bytes should a bloomfilter be if we have already seen |
964 | * entry_count responses? Note that BLOOMFILTER_K gives us the number | 1133 | * entry_count responses? Note that BLOOMFILTER_K gives us the number |
965 | * of bits set per entry. Furthermore, we should not re-size the | 1134 | * of bits set per entry. Furthermore, we should not re-size the |
@@ -1024,75 +1193,15 @@ refresh_bloomfilter (unsigned int count, | |||
1024 | } | 1193 | } |
1025 | return bf; | 1194 | return bf; |
1026 | } | 1195 | } |
1196 | #endif | ||
1027 | 1197 | ||
1028 | 1198 | ||
1029 | /** | 1199 | /** |
1030 | * Closure used for "target_peer_select_cb". | 1200 | * We use a random delay to make the timing of requests less |
1031 | */ | 1201 | * predictable. This function returns such a random delay. |
1032 | struct PeerSelectionContext | ||
1033 | { | ||
1034 | /** | ||
1035 | * The request for which we are selecting | ||
1036 | * peers. | ||
1037 | */ | ||
1038 | struct PendingRequest *pr; | ||
1039 | |||
1040 | /** | ||
1041 | * Current "prime" target. | ||
1042 | */ | ||
1043 | struct GNUNET_PeerIdentity target; | ||
1044 | |||
1045 | /** | ||
1046 | * How much do we like this target? | ||
1047 | */ | ||
1048 | double target_score; | ||
1049 | |||
1050 | }; | ||
1051 | |||
1052 | |||
1053 | /** | ||
1054 | * Function called for each connected peer to determine | ||
1055 | * which one(s) would make good targets for forwarding. | ||
1056 | * | 1202 | * |
1057 | * @param cls closure (struct PeerSelectionContext) | 1203 | * FIXME: make schedule dependent on the specifics of the request? |
1058 | * @param key current key code (peer identity) | 1204 | * Or bandwidth and number of connected peers and load? |
1059 | * @param value value in the hash map (struct ConnectedPeer) | ||
1060 | * @return GNUNET_YES if we should continue to | ||
1061 | * iterate, | ||
1062 | * GNUNET_NO if not. | ||
1063 | */ | ||
1064 | static int | ||
1065 | target_peer_select_cb (void *cls, | ||
1066 | const GNUNET_HashCode * key, | ||
1067 | void *value) | ||
1068 | { | ||
1069 | struct PeerSelectionContext *psc = cls; | ||
1070 | struct ConnectedPeer *cp = value; | ||
1071 | struct PendingRequest *pr = psc->pr; | ||
1072 | double score; | ||
1073 | unsigned int i; | ||
1074 | |||
1075 | /* 1) check if we have already (recently) forwarded to this peer */ | ||
1076 | for (i=0;i<pr->used_pids_off;i++) | ||
1077 | if (pr->used_pids[i] == cp->pid) | ||
1078 | return GNUNET_YES; /* skip */ | ||
1079 | // 2) calculate how much we'd like to forward to this peer | ||
1080 | score = 0; // FIXME! | ||
1081 | |||
1082 | /* store best-fit in closure */ | ||
1083 | if (score > psc->target_score) | ||
1084 | { | ||
1085 | psc->target_score = score; | ||
1086 | psc->target.hashPubKey = *key; | ||
1087 | } | ||
1088 | return GNUNET_YES; | ||
1089 | } | ||
1090 | |||
1091 | |||
1092 | /** | ||
1093 | * We use a random delay to make the timing of requests | ||
1094 | * less predictable. This function returns such a random | ||
1095 | * delay. | ||
1096 | * | 1205 | * |
1097 | * @return random delay to use for some request, between 0 and TTL_DECREMENT ms | 1206 | * @return random delay to use for some request, between 0 and TTL_DECREMENT ms |
1098 | */ | 1207 | */ |
@@ -1106,69 +1215,6 @@ get_processing_delay () | |||
1106 | 1215 | ||
1107 | 1216 | ||
1108 | /** | 1217 | /** |
1109 | * Task that is run for each request with the goal of forwarding the | ||
1110 | * associated query to other peers. The task should re-schedule | ||
1111 | * itself to be re-run once the TTL has expired. (or at a later time | ||
1112 | * if more peers should be queried earlier). | ||
1113 | * | ||
1114 | * @param cls the requests "struct PendingRequest*" | ||
1115 | * @param tc task context (unused) | ||
1116 | */ | ||
1117 | static void | ||
1118 | forward_request_task (void *cls, | ||
1119 | const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
1120 | |||
1121 | |||
1122 | /** | ||
1123 | * We've selected a peer for forwarding of a query. Construct the | ||
1124 | * message and then re-schedule the task to forward again to (other) | ||
1125 | * peers. | ||
1126 | * | ||
1127 | * @param cls closure | ||
1128 | * @param size number of bytes available in buf | ||
1129 | * @param buf where the callee should write the message | ||
1130 | * @return number of bytes written to buf | ||
1131 | */ | ||
1132 | static size_t | ||
1133 | transmit_request_cb (void *cls, | ||
1134 | size_t size, | ||
1135 | void *buf) | ||
1136 | { | ||
1137 | struct ConnectedPeer *cp = cls; | ||
1138 | char *cbuf = buf; | ||
1139 | struct GNUNET_PeerIdentity target; | ||
1140 | struct PendingMessage *pr; | ||
1141 | size_t tot; | ||
1142 | |||
1143 | cp->cth = NULL; | ||
1144 | tot = 0; | ||
1145 | while ( (NULL != (pr = cp->pending_messages)) && | ||
1146 | (pr->msize < size - tot) ) | ||
1147 | { | ||
1148 | memcpy (&cbuf[tot], | ||
1149 | &pr[1], | ||
1150 | pr->msize); | ||
1151 | tot += pr->msize; | ||
1152 | cp->pending_messages = pr->next; | ||
1153 | GNUNET_free (pr); | ||
1154 | } | ||
1155 | if (NULL != pr) | ||
1156 | { | ||
1157 | GNUNET_PEER_resolve (cp->pid, | ||
1158 | &target); | ||
1159 | cp->cth = GNUNET_CORE_notify_transmit_ready (core, | ||
1160 | pr->priority, | ||
1161 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1162 | &target, | ||
1163 | pr->msize, | ||
1164 | &transmit_request_cb, | ||
1165 | cp); | ||
1166 | } | ||
1167 | return tot; | ||
1168 | } | ||
1169 | |||
1170 | |||
1171 | /** | ||
1172 | * Function called after we've tried to reserve a certain amount of | 1218 | * Function called after we've tried to reserve a certain amount of |
1173 | * bandwidth for a reply. Check if we succeeded and if so send our | 1219 | * bandwidth for a reply. Check if we succeeded and if so send our |
1174 | * query. | 1220 | * query. |
@@ -1192,18 +1238,12 @@ target_reservation_cb (void *cls, | |||
1192 | struct PendingRequest *pr = cls; | 1238 | struct PendingRequest *pr = cls; |
1193 | struct ConnectedPeer *cp; | 1239 | struct ConnectedPeer *cp; |
1194 | struct PendingMessage *pm; | 1240 | struct PendingMessage *pm; |
1195 | struct PendingMessage *pos; | ||
1196 | struct PendingMessage *prev; | ||
1197 | struct GetMessage *gm; | 1241 | struct GetMessage *gm; |
1198 | GNUNET_HashCode *ext; | 1242 | GNUNET_HashCode *ext; |
1199 | char *bfdata; | 1243 | char *bfdata; |
1200 | size_t msize; | 1244 | size_t msize; |
1201 | unsigned int k; | 1245 | unsigned int k; |
1202 | 1246 | ||
1203 | pr->task = GNUNET_SCHEDULER_add_delayed (sched, | ||
1204 | get_processing_delay (), // FIXME: longer? | ||
1205 | &forward_request_task, | ||
1206 | pr); | ||
1207 | pr->irc = NULL; | 1247 | pr->irc = NULL; |
1208 | GNUNET_assert (peer != NULL); | 1248 | GNUNET_assert (peer != NULL); |
1209 | if (amount != DBLOCK_SIZE) | 1249 | if (amount != DBLOCK_SIZE) |
@@ -1211,15 +1251,12 @@ target_reservation_cb (void *cls, | |||
1211 | /* FIXME: call stats... */ | 1251 | /* FIXME: call stats... */ |
1212 | return; /* this target round failed */ | 1252 | return; /* this target round failed */ |
1213 | } | 1253 | } |
1214 | // (2) transmit, update ttl/priority | 1254 | // (3) transmit, update ttl/priority |
1215 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | 1255 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, |
1216 | &peer->hashPubKey); | 1256 | &peer->hashPubKey); |
1217 | if (cp == NULL) | 1257 | if (cp == NULL) |
1218 | { | 1258 | { |
1219 | /* Peer must have just left; try again immediately */ | 1259 | /* Peer must have just left */ |
1220 | pr->task = GNUNET_SCHEDULER_add_now (sched, | ||
1221 | &forward_request_task, | ||
1222 | pr); | ||
1223 | return; | 1260 | return; |
1224 | } | 1261 | } |
1225 | /* build message and insert message into priority queue */ | 1262 | /* build message and insert message into priority queue */ |
@@ -1228,7 +1265,6 @@ target_reservation_cb (void *cls, | |||
1228 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); | 1265 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); |
1229 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); | 1266 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); |
1230 | pm->msize = msize; | 1267 | pm->msize = msize; |
1231 | pm->priority = 0; // FIXME: calculate priority properly! | ||
1232 | gm = (struct GetMessage*) &pm[1]; | 1268 | gm = (struct GetMessage*) &pm[1]; |
1233 | gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); | 1269 | gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); |
1234 | gm->header.size = htons (msize); | 1270 | gm->header.size = htons (msize); |
@@ -1236,784 +1272,570 @@ target_reservation_cb (void *cls, | |||
1236 | pr->remaining_priority /= 2; | 1272 | pr->remaining_priority /= 2; |
1237 | gm->priority = htonl (pr->remaining_priority); | 1273 | gm->priority = htonl (pr->remaining_priority); |
1238 | gm->ttl = htonl (pr->ttl); | 1274 | gm->ttl = htonl (pr->ttl); |
1239 | gm->filter_mutator = htonl(pr->mingle); | 1275 | gm->filter_mutator = htonl(pr->mingle); // FIXME: bad endianess conversion? |
1240 | gm->hash_bitmap = htonl (42); | 1276 | gm->hash_bitmap = htonl (42); // FIXME! |
1241 | gm->query = pr->query; | 1277 | gm->query = pr->query; |
1242 | ext = (GNUNET_HashCode*) &gm[1]; | 1278 | ext = (GNUNET_HashCode*) &gm[1]; |
1243 | |||
1244 | // FIXME: setup "ext[0]..[k-1]" | 1279 | // FIXME: setup "ext[0]..[k-1]" |
1245 | bfdata = (char *) &ext[k]; | 1280 | bfdata = (char *) &ext[k]; |
1246 | if (pr->bf != NULL) | 1281 | if (pr->bf != NULL) |
1247 | GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, | 1282 | GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, |
1248 | bfdata, | 1283 | bfdata, |
1249 | pr->bf_size); | 1284 | pr->bf_size); |
1285 | pm->cont = &transmit_query_continuation; | ||
1286 | pm->cont_cls = pr; | ||
1287 | add_to_pending_messages_for_peer (cp, pm, pr); | ||
1288 | } | ||
1250 | 1289 | ||
1251 | 1290 | ||
1252 | prev = NULL; | 1291 | |
1253 | pos = cp->pending_messages; | 1292 | /** |
1254 | while ( (pos != NULL) && | 1293 | * Closure used for "target_peer_select_cb". |
1255 | (pm->priority < pos->priority) ) | 1294 | */ |
1256 | { | 1295 | struct PeerSelectionContext |
1257 | prev = pos; | 1296 | { |
1258 | pos = pos->next; | 1297 | /** |
1259 | } | 1298 | * The request for which we are selecting |
1260 | if (prev == NULL) | 1299 | * peers. |
1261 | cp->pending_messages = pm; | 1300 | */ |
1262 | else | 1301 | struct PendingRequest *pr; |
1263 | prev->next = pm; | 1302 | |
1264 | pm->next = pos; | 1303 | /** |
1265 | if (cp->cth == NULL) | 1304 | * Current "prime" target. |
1266 | cp->cth = GNUNET_CORE_notify_transmit_ready (core, | 1305 | */ |
1267 | cp->pending_messages->priority, | 1306 | struct GNUNET_PeerIdentity target; |
1268 | GNUNET_TIME_UNIT_FOREVER_REL, | 1307 | |
1269 | peer, | 1308 | /** |
1270 | msize, | 1309 | * How much do we like this target? |
1271 | &transmit_request_cb, | 1310 | */ |
1272 | cp); | 1311 | double target_score; |
1273 | if (cp->cth == NULL) | 1312 | |
1313 | }; | ||
1314 | |||
1315 | |||
1316 | /** | ||
1317 | * Function called for each connected peer to determine | ||
1318 | * which one(s) would make good targets for forwarding. | ||
1319 | * | ||
1320 | * @param cls closure (struct PeerSelectionContext) | ||
1321 | * @param key current key code (peer identity) | ||
1322 | * @param value value in the hash map (struct ConnectedPeer) | ||
1323 | * @return GNUNET_YES if we should continue to | ||
1324 | * iterate, | ||
1325 | * GNUNET_NO if not. | ||
1326 | */ | ||
1327 | static int | ||
1328 | target_peer_select_cb (void *cls, | ||
1329 | const GNUNET_HashCode * key, | ||
1330 | void *value) | ||
1331 | { | ||
1332 | struct PeerSelectionContext *psc = cls; | ||
1333 | struct ConnectedPeer *cp = value; | ||
1334 | struct PendingRequest *pr = psc->pr; | ||
1335 | double score; | ||
1336 | unsigned int i; | ||
1337 | |||
1338 | /* 1) check if we have already (recently) forwarded to this peer */ | ||
1339 | for (i=0;i<pr->used_pids_off;i++) | ||
1340 | if (pr->used_pids[i] == cp->pid) | ||
1341 | return GNUNET_YES; /* skip */ | ||
1342 | // 2) calculate how much we'd like to forward to this peer | ||
1343 | score = 42; // FIXME! | ||
1344 | // FIXME: also need API to gather data on responsiveness | ||
1345 | // of this peer (we have fields for that in 'cp', but | ||
1346 | // they are never set!) | ||
1347 | |||
1348 | /* store best-fit in closure */ | ||
1349 | if (score > psc->target_score) | ||
1274 | { | 1350 | { |
1275 | /* technically, this should not be a 'break'; but | 1351 | psc->target_score = score; |
1276 | we don't handle this (rare) case yet, so let's warn | 1352 | psc->target.hashPubKey = *key; |
1277 | about it... */ | ||
1278 | GNUNET_break (0); | ||
1279 | // FIXME: now what? | ||
1280 | } | 1353 | } |
1354 | return GNUNET_YES; | ||
1281 | } | 1355 | } |
1282 | 1356 | ||
1283 | 1357 | ||
1284 | /** | 1358 | /** |
1285 | * Task that is run for each request with the goal of forwarding the | 1359 | * We're processing a GET request from another peer and have decided |
1286 | * associated query to other peers. The task should re-schedule | 1360 | * to forward it to other peers. This function is called periodically |
1287 | * itself to be re-run once the TTL has expired. (or at a later time | 1361 | * and should forward the request to other peers until we have all |
1288 | * if more peers should be queried earlier). | 1362 | * possible replies. If we have transmitted the *only* reply to |
1363 | * the initiator we should destroy the pending request. If we have | ||
1364 | * many replies in the queue to the initiator, we should delay sending | ||
1365 | * out more queries until the reply queue has shrunk some. | ||
1289 | * | 1366 | * |
1290 | * @param cls the requests "struct PendingRequest*" | 1367 | * @param cls our "struct ProcessGetContext *" |
1291 | * @param tc task context (unused) | 1368 | * @param tc unused |
1292 | */ | 1369 | */ |
1293 | static void | 1370 | static void |
1294 | forward_request_task (void *cls, | 1371 | forward_request_task (void *cls, |
1295 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 1372 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
1296 | { | 1373 | { |
1297 | struct PendingRequest *pr = cls; | 1374 | struct PendingRequest *pr = cls; |
1298 | struct PeerSelectionContext psc; | 1375 | struct PeerSelectionContext psc; |
1376 | struct ConnectedPeer *cp; | ||
1299 | 1377 | ||
1300 | pr->task = GNUNET_SCHEDULER_NO_TASK; | 1378 | pr->task = GNUNET_SCHEDULER_add_delayed (sched, |
1379 | get_processing_delay (), | ||
1380 | &forward_request_task, | ||
1381 | pr); | ||
1382 | if (pr->irc != NULL) | ||
1383 | return; /* previous request still pending */ | ||
1301 | /* (1) select target */ | 1384 | /* (1) select target */ |
1302 | psc.pr = pr; | 1385 | psc.pr = pr; |
1303 | psc.target_score = DBL_MIN; | 1386 | psc.target_score = DBL_MIN; |
1304 | GNUNET_CONTAINER_multihashmap_iterate (connected_peers, | 1387 | GNUNET_CONTAINER_multihashmap_iterate (connected_peers, |
1305 | &target_peer_select_cb, | 1388 | &target_peer_select_cb, |
1306 | &psc); | 1389 | &psc); |
1307 | if (psc.target_score == DBL_MIN) | 1390 | if (psc.target_score == DBL_MIN) |
1308 | { | 1391 | return; /* nobody selected */ |
1309 | /* no possible target found, wait some time */ | 1392 | |
1310 | pr->task = GNUNET_SCHEDULER_add_delayed (sched, | ||
1311 | get_processing_delay (), // FIXME: exponential back-off? or at least wait longer... | ||
1312 | &forward_request_task, | ||
1313 | pr); | ||
1314 | return; | ||
1315 | } | ||
1316 | /* (2) reserve reply bandwidth */ | 1393 | /* (2) reserve reply bandwidth */ |
1317 | GNUNET_assert (NULL == pr->irc); | 1394 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, |
1395 | &psc.target.hashPubKey); | ||
1318 | pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg, | 1396 | pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg, |
1319 | &psc.target, | 1397 | &psc.target, |
1320 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | 1398 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, |
1321 | -1, | 1399 | (uint32_t) -1 /* no limit */, |
1322 | DBLOCK_SIZE, // FIXME: make dependent on type? | 1400 | DBLOCK_SIZE, |
1323 | 0, | 1401 | (uint64_t) cp->inc_preference, |
1324 | &target_reservation_cb, | 1402 | &target_reservation_cb, |
1325 | pr); | 1403 | pr); |
1404 | cp->inc_preference = 0.0; | ||
1326 | } | 1405 | } |
1327 | 1406 | ||
1328 | 1407 | ||
1329 | /** | 1408 | /* **************************** P2P PUT Handling ************************ */ |
1330 | * We're processing (local) results for a search request | ||
1331 | * from a (local) client. Pass applicable results to the | ||
1332 | * client and if we are done either clean up (operation | ||
1333 | * complete) or switch to P2P search (more results possible). | ||
1334 | * | ||
1335 | * @param cls our closure (struct LocalGetContext) | ||
1336 | * @param key key for the content | ||
1337 | * @param size number of bytes in data | ||
1338 | * @param data content stored | ||
1339 | * @param type type of the content | ||
1340 | * @param priority priority of the content | ||
1341 | * @param anonymity anonymity-level for the content | ||
1342 | * @param expiration expiration time for the content | ||
1343 | * @param uid unique identifier for the datum; | ||
1344 | * maybe 0 if no unique identifier is available | ||
1345 | */ | ||
1346 | static void | ||
1347 | process_local_get_result (void *cls, | ||
1348 | const GNUNET_HashCode * key, | ||
1349 | uint32_t size, | ||
1350 | const void *data, | ||
1351 | uint32_t type, | ||
1352 | uint32_t priority, | ||
1353 | uint32_t anonymity, | ||
1354 | struct GNUNET_TIME_Absolute | ||
1355 | expiration, | ||
1356 | uint64_t uid) | ||
1357 | { | ||
1358 | struct LocalGetContext *lgc = cls; | ||
1359 | struct PendingRequest *pr; | ||
1360 | struct ClientRequestList *crl; | ||
1361 | struct ClientList *cl; | ||
1362 | size_t msize; | ||
1363 | unsigned int i; | ||
1364 | |||
1365 | if (key == NULL) | ||
1366 | { | ||
1367 | #if DEBUG_FS | ||
1368 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1369 | "Received last result for `%s' from local datastore, deciding what to do next.\n", | ||
1370 | GNUNET_h2s (&lgc->query)); | ||
1371 | #endif | ||
1372 | /* no further results from datastore; continue | ||
1373 | processing further requests from the client and | ||
1374 | allow the next task to use the datastore; also, | ||
1375 | switch to P2P requests or clean up our state. */ | ||
1376 | next_ds_request (); | ||
1377 | GNUNET_SERVER_receive_done (lgc->client, | ||
1378 | GNUNET_OK); | ||
1379 | if ( (lgc->results_used == 0) || | ||
1380 | (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) || | ||
1381 | (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) || | ||
1382 | (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) | ||
1383 | { | ||
1384 | #if DEBUG_FS | ||
1385 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1386 | "Forwarding query for `%s' to network.\n", | ||
1387 | GNUNET_h2s (&lgc->query)); | ||
1388 | #endif | ||
1389 | cl = clients; | ||
1390 | while ( (NULL != cl) && | ||
1391 | (cl->client != lgc->client) ) | ||
1392 | cl = cl->next; | ||
1393 | if (cl == NULL) | ||
1394 | { | ||
1395 | cl = GNUNET_malloc (sizeof (struct ClientList)); | ||
1396 | cl->client = lgc->client; | ||
1397 | cl->next = clients; | ||
1398 | clients = cl; | ||
1399 | } | ||
1400 | crl = GNUNET_malloc (sizeof (struct ClientRequestList)); | ||
1401 | crl->cl = cl; | ||
1402 | GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl); | ||
1403 | pr = GNUNET_malloc (sizeof (struct PendingRequest)); | ||
1404 | pr->client = lgc->client; | ||
1405 | GNUNET_SERVER_client_keep (pr->client); | ||
1406 | pr->crl_entry = crl; | ||
1407 | crl->req = pr; | ||
1408 | if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) | ||
1409 | { | ||
1410 | pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode)); | ||
1411 | *pr->namespace = lgc->namespace; | ||
1412 | } | ||
1413 | pr->replies_seen = lgc->results; | ||
1414 | lgc->results = NULL; | ||
1415 | pr->start_time = GNUNET_TIME_absolute_get (); | ||
1416 | pr->query = lgc->query; | ||
1417 | pr->target_pid = GNUNET_PEER_intern (&lgc->target); | ||
1418 | pr->replies_seen_off = lgc->results_used; | ||
1419 | pr->replies_seen_size = lgc->results_size; | ||
1420 | lgc->results_size = 0; | ||
1421 | pr->type = lgc->type; | ||
1422 | pr->anonymity_level = lgc->anonymity_level; | ||
1423 | pr->bf = refresh_bloomfilter (pr->replies_seen_off, | ||
1424 | &pr->mingle, | ||
1425 | &pr->bf_size, | ||
1426 | pr->replies_seen); | ||
1427 | GNUNET_CONTAINER_multihashmap_put (requests_by_query, | ||
1428 | &pr->query, | ||
1429 | pr, | ||
1430 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
1431 | pr->task = GNUNET_SCHEDULER_add_delayed (sched, | ||
1432 | get_processing_delay (), | ||
1433 | &forward_request_task, | ||
1434 | pr); | ||
1435 | local_get_context_free (lgc); | ||
1436 | return; | ||
1437 | } | ||
1438 | /* got all possible results, clean up! */ | ||
1439 | #if DEBUG_FS | ||
1440 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1441 | "Found all possible results for query for `%s', done!\n", | ||
1442 | GNUNET_h2s (&lgc->query)); | ||
1443 | #endif | ||
1444 | local_get_context_free (lgc); | ||
1445 | return; | ||
1446 | } | ||
1447 | if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND) | ||
1448 | { | ||
1449 | #if DEBUG_FS | ||
1450 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1451 | "Received on-demand block for `%s' from local datastore, fetching data.\n", | ||
1452 | GNUNET_h2s (&lgc->query)); | ||
1453 | #endif | ||
1454 | GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, | ||
1455 | anonymity, expiration, uid, | ||
1456 | dsh, | ||
1457 | &process_local_get_result, | ||
1458 | lgc); | ||
1459 | return; | ||
1460 | } | ||
1461 | if ( (type != lgc->type) && | ||
1462 | (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) ) | ||
1463 | { | ||
1464 | /* this should be virtually impossible to reach (DBLOCK | ||
1465 | query hash being identical to KBLOCK/SBLOCK query hash); | ||
1466 | nevertheless, if it happens, the correct thing is to | ||
1467 | simply skip the result. */ | ||
1468 | #if DEBUG_FS | ||
1469 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1470 | "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n", | ||
1471 | type, | ||
1472 | lgc->type, | ||
1473 | GNUNET_h2s (&lgc->query)); | ||
1474 | #endif | ||
1475 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | ||
1476 | return; | ||
1477 | } | ||
1478 | /* check if this is a result we've alredy | ||
1479 | received */ | ||
1480 | for (i=0;i<lgc->results_used;i++) | ||
1481 | if (0 == memcmp (key, | ||
1482 | &lgc->results[i], | ||
1483 | sizeof (GNUNET_HashCode))) | ||
1484 | { | ||
1485 | #if DEBUG_FS | ||
1486 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1487 | "Received duplicate result for `%s' from local datastore, ignoring.\n", | ||
1488 | GNUNET_h2s (&lgc->query)); | ||
1489 | #endif | ||
1490 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | ||
1491 | return; | ||
1492 | } | ||
1493 | if (lgc->results_used == lgc->results_size) | ||
1494 | GNUNET_array_grow (lgc->results, | ||
1495 | lgc->results_size, | ||
1496 | lgc->results_size * 2 + 2); | ||
1497 | GNUNET_CRYPTO_hash (data, | ||
1498 | size, | ||
1499 | &lgc->results[lgc->results_used++]); | ||
1500 | msize = size + sizeof (struct ContentMessage); | ||
1501 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); | ||
1502 | lgc->result = GNUNET_malloc (msize); | ||
1503 | lgc->result->header.size = htons (msize); | ||
1504 | lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT); | ||
1505 | lgc->result->type = htonl (type); | ||
1506 | lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration); | ||
1507 | memcpy (&lgc->result[1], | ||
1508 | data, | ||
1509 | size); | ||
1510 | #if DEBUG_FS | ||
1511 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1512 | "Received new result for `%s' from local datastore, passing to client.\n", | ||
1513 | GNUNET_h2s (&lgc->query)); | ||
1514 | #endif | ||
1515 | GNUNET_SERVER_notify_transmit_ready (lgc->client, | ||
1516 | msize, | ||
1517 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1518 | &transmit_local_result, | ||
1519 | lgc); | ||
1520 | } | ||
1521 | 1409 | ||
1522 | 1410 | ||
1523 | /** | 1411 | /** |
1524 | * We're processing a search request from a local | 1412 | * Function called after we either failed or succeeded |
1525 | * client. Now it is our turn to query the datastore. | 1413 | * at transmitting a reply to a peer. |
1526 | * | ||
1527 | * @param cls our closure (struct LocalGetContext) | ||
1528 | * @param tc unused | ||
1529 | */ | ||
1530 | static void | ||
1531 | transmit_local_get (void *cls, | ||
1532 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1533 | { | ||
1534 | struct LocalGetContext *lgc = cls; | ||
1535 | uint32_t type; | ||
1536 | |||
1537 | type = lgc->type; | ||
1538 | if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK) | ||
1539 | type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */ | ||
1540 | GNUNET_DATASTORE_get (dsh, | ||
1541 | &lgc->query, | ||
1542 | type, | ||
1543 | &process_local_get_result, | ||
1544 | lgc, | ||
1545 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
1546 | } | ||
1547 | |||
1548 | |||
1549 | /** | ||
1550 | * We're processing a search request from a local | ||
1551 | * client. Now it is our turn to query the datastore. | ||
1552 | * | ||
1553 | * @param cls our closure (struct LocalGetContext) | ||
1554 | * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK | ||
1555 | */ | ||
1556 | static void | ||
1557 | transmit_local_get_ready (void *cls, | ||
1558 | int ok) | ||
1559 | { | ||
1560 | struct LocalGetContext *lgc = cls; | ||
1561 | |||
1562 | GNUNET_assert (GNUNET_OK == ok); | ||
1563 | GNUNET_SCHEDULER_add_continuation (sched, | ||
1564 | &transmit_local_get, | ||
1565 | lgc, | ||
1566 | GNUNET_SCHEDULER_REASON_PREREQ_DONE); | ||
1567 | } | ||
1568 | |||
1569 | |||
1570 | /** | ||
1571 | * Handle START_SEARCH-message (search request from client). | ||
1572 | * | 1414 | * |
1573 | * @param cls closure | 1415 | * @param cls the requests "struct PendingRequest*" |
1574 | * @param client identification of the client | 1416 | * @param pid ID of receiving peer, 0 on transmission error |
1575 | * @param message the actual message | ||
1576 | */ | 1417 | */ |
1577 | static void | 1418 | static void |
1578 | handle_start_search (void *cls, | 1419 | transmit_reply_continuation (void *cls, |
1579 | struct GNUNET_SERVER_Client *client, | 1420 | GNUNET_PEER_Id tpid) |
1580 | const struct GNUNET_MessageHeader *message) | ||
1581 | { | 1421 | { |
1582 | const struct SearchMessage *sm; | 1422 | struct PendingRequest *pr = cls; |
1583 | struct LocalGetContext *lgc; | 1423 | |
1584 | uint16_t msize; | 1424 | switch (pr->type) |
1585 | unsigned int sc; | ||
1586 | |||
1587 | msize = ntohs (message->size); | ||
1588 | if ( (msize < sizeof (struct SearchMessage)) || | ||
1589 | (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) ) | ||
1590 | { | ||
1591 | GNUNET_break (0); | ||
1592 | GNUNET_SERVER_receive_done (client, | ||
1593 | GNUNET_SYSERR); | ||
1594 | return; | ||
1595 | } | ||
1596 | sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode); | ||
1597 | sm = (const struct SearchMessage*) message; | ||
1598 | GNUNET_SERVER_client_keep (client); | ||
1599 | lgc = GNUNET_malloc (sizeof (struct LocalGetContext)); | ||
1600 | if (sc > 0) | ||
1601 | { | ||
1602 | lgc->results_used = sc; | ||
1603 | GNUNET_array_grow (lgc->results, | ||
1604 | lgc->results_size, | ||
1605 | sc * 2); | ||
1606 | memcpy (lgc->results, | ||
1607 | &sm[1], | ||
1608 | sc * sizeof (GNUNET_HashCode)); | ||
1609 | } | ||
1610 | lgc->client = client; | ||
1611 | lgc->type = ntohl (sm->type); | ||
1612 | lgc->anonymity_level = ntohl (sm->anonymity_level); | ||
1613 | switch (lgc->type) | ||
1614 | { | 1425 | { |
1615 | case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: | 1426 | case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: |
1616 | case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: | 1427 | case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: |
1617 | lgc->target.hashPubKey = sm->target; | 1428 | /* only one reply expected, done with the request! */ |
1429 | destroy_pending_request (pr); | ||
1618 | break; | 1430 | break; |
1431 | case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: | ||
1619 | case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: | 1432 | case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: |
1620 | lgc->namespace = sm->target; | ||
1621 | break; | 1433 | break; |
1622 | default: | 1434 | default: |
1435 | GNUNET_break (0); | ||
1623 | break; | 1436 | break; |
1624 | } | 1437 | } |
1625 | lgc->query = sm->query; | ||
1626 | GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc); | ||
1627 | lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL, | ||
1628 | &transmit_local_get_ready, | ||
1629 | lgc); | ||
1630 | } | 1438 | } |
1631 | 1439 | ||
1632 | 1440 | ||
1633 | /** | 1441 | /** |
1634 | * List of handlers for the messages understood by this | 1442 | * Check if the given KBlock is well-formed. |
1635 | * service. | ||
1636 | */ | ||
1637 | static struct GNUNET_SERVER_MessageHandler handlers[] = { | ||
1638 | {&GNUNET_FS_handle_index_start, NULL, | ||
1639 | GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0}, | ||
1640 | {&GNUNET_FS_handle_index_list_get, NULL, | ||
1641 | GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) }, | ||
1642 | {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, | ||
1643 | sizeof (struct UnindexMessage) }, | ||
1644 | {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, | ||
1645 | 0 }, | ||
1646 | {NULL, NULL, 0, 0} | ||
1647 | }; | ||
1648 | |||
1649 | |||
1650 | /** | ||
1651 | * Clean up the memory used by the PendingRequest structure (except | ||
1652 | * for the client or peer list that the request may be part of). | ||
1653 | * | 1443 | * |
1654 | * @param pr request to clean up | 1444 | * @param kb the kblock data (or at least "dsize" bytes claiming to be one) |
1445 | * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)! | ||
1446 | * @param query where to store the query that this block answers | ||
1447 | * @return GNUNET_OK if this is actually a well-formed KBlock | ||
1655 | */ | 1448 | */ |
1656 | static void | 1449 | static int |
1657 | destroy_pending_request (struct PendingRequest *pr) | 1450 | check_kblock (const struct KBlock *kb, |
1451 | size_t dsize, | ||
1452 | GNUNET_HashCode *query) | ||
1658 | { | 1453 | { |
1659 | struct PendingMessage *reply; | 1454 | if (dsize < sizeof (struct KBlock)) |
1660 | struct ClientList *cl; | ||
1661 | |||
1662 | GNUNET_CONTAINER_multihashmap_remove (requests_by_query, | ||
1663 | &pr->query, | ||
1664 | pr); | ||
1665 | // FIXME: not sure how this can work (efficiently) | ||
1666 | // also, what does the return value mean? | ||
1667 | if (pr->irc != NULL) | ||
1668 | { | ||
1669 | GNUNET_CORE_peer_change_preference_cancel (pr->irc); | ||
1670 | pr->irc = NULL; | ||
1671 | } | ||
1672 | if (pr->client == NULL) | ||
1673 | { | 1455 | { |
1674 | GNUNET_CONTAINER_heap_remove_node (requests_by_expiration, | 1456 | GNUNET_break_op (0); |
1675 | pr->hnode); | 1457 | return GNUNET_SYSERR; |
1676 | } | 1458 | } |
1677 | else | 1459 | if (dsize - sizeof (struct KBlock) != |
1460 | ntohs (kb->purpose.size) | ||
1461 | - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) | ||
1462 | - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) | ||
1678 | { | 1463 | { |
1679 | cl = pr->crl_entry->cl; | 1464 | GNUNET_break_op (0); |
1680 | GNUNET_CONTAINER_DLL_remove (cl->head, | 1465 | return GNUNET_SYSERR; |
1681 | cl->tail, | ||
1682 | pr->crl_entry); | ||
1683 | } | 1466 | } |
1684 | if (GNUNET_SCHEDULER_NO_TASK != pr->task) | 1467 | if (GNUNET_OK != |
1685 | GNUNET_SCHEDULER_cancel (sched, pr->task); | 1468 | GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK, |
1686 | if (NULL != pr->bf) | 1469 | &kb->purpose, |
1687 | GNUNET_CONTAINER_bloomfilter_free (pr->bf); | 1470 | &kb->signature, |
1688 | if (NULL != pr->th) | 1471 | &kb->keyspace)) |
1689 | GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th); | ||
1690 | while (NULL != (reply = pr->replies_pending)) | ||
1691 | { | 1472 | { |
1692 | pr->replies_pending = reply->next; | 1473 | GNUNET_break_op (0); |
1693 | GNUNET_free (reply); | 1474 | return GNUNET_SYSERR; |
1694 | } | 1475 | } |
1695 | if (NULL != pr->cth) | 1476 | if (query != NULL) |
1696 | GNUNET_CORE_notify_transmit_ready_cancel (pr->cth); | 1477 | GNUNET_CRYPTO_hash (&kb->keyspace, |
1697 | GNUNET_PEER_change_rc (pr->source_pid, -1); | 1478 | sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), |
1698 | GNUNET_PEER_change_rc (pr->target_pid, -1); | 1479 | query); |
1699 | GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off); | 1480 | return GNUNET_OK; |
1700 | GNUNET_free_non_null (pr->used_pids); | ||
1701 | GNUNET_free_non_null (pr->replies_seen); | ||
1702 | GNUNET_free_non_null (pr->namespace); | ||
1703 | GNUNET_free (pr); | ||
1704 | } | 1481 | } |
1705 | 1482 | ||
1706 | 1483 | ||
1707 | /** | 1484 | /** |
1708 | * A client disconnected. Remove all of its pending queries. | 1485 | * Check if the given SBlock is well-formed. |
1709 | * | 1486 | * |
1710 | * @param cls closure, NULL | 1487 | * @param sb the sblock data (or at least "dsize" bytes claiming to be one) |
1711 | * @param client identification of the client | 1488 | * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)! |
1489 | * @param query where to store the query that this block answers | ||
1490 | * @param namespace where to store the namespace that this block belongs to | ||
1491 | * @return GNUNET_OK if this is actually a well-formed SBlock | ||
1712 | */ | 1492 | */ |
1713 | static void | 1493 | static int |
1714 | handle_client_disconnect (void *cls, | 1494 | check_sblock (const struct SBlock *sb, |
1715 | struct GNUNET_SERVER_Client | 1495 | size_t dsize, |
1716 | * client) | 1496 | GNUNET_HashCode *query, |
1497 | GNUNET_HashCode *namespace) | ||
1717 | { | 1498 | { |
1718 | struct LocalGetContext *lgc; | 1499 | if (dsize < sizeof (struct SBlock)) |
1719 | struct ClientList *cpos; | ||
1720 | struct ClientList *cprev; | ||
1721 | struct ClientRequestList *rl; | ||
1722 | |||
1723 | if (client == NULL) | ||
1724 | return; | ||
1725 | lgc = lgc_head; | ||
1726 | while ( (NULL != lgc) && | ||
1727 | (lgc->client != client) ) | ||
1728 | lgc = lgc->next; | ||
1729 | if (lgc != NULL) | ||
1730 | local_get_context_free (lgc); | ||
1731 | cprev = NULL; | ||
1732 | cpos = clients; | ||
1733 | while ( (NULL != cpos) && | ||
1734 | (clients->client != client) ) | ||
1735 | { | 1500 | { |
1736 | cprev = cpos; | 1501 | GNUNET_break_op (0); |
1737 | cpos = cpos->next; | 1502 | return GNUNET_SYSERR; |
1738 | } | 1503 | } |
1739 | if (cpos != NULL) | 1504 | if (dsize != |
1505 | ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature)) | ||
1740 | { | 1506 | { |
1741 | if (cprev == NULL) | 1507 | GNUNET_break_op (0); |
1742 | clients = cpos->next; | 1508 | return GNUNET_SYSERR; |
1743 | else | ||
1744 | cprev->next = cpos->next; | ||
1745 | while (NULL != (rl = cpos->head)) | ||
1746 | { | ||
1747 | cpos->head = rl->next; | ||
1748 | destroy_pending_request (rl->req); | ||
1749 | GNUNET_free (rl); | ||
1750 | } | ||
1751 | GNUNET_free (cpos); | ||
1752 | } | 1509 | } |
1510 | if (GNUNET_OK != | ||
1511 | GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK, | ||
1512 | &sb->purpose, | ||
1513 | &sb->signature, | ||
1514 | &sb->subspace)) | ||
1515 | { | ||
1516 | GNUNET_break_op (0); | ||
1517 | return GNUNET_SYSERR; | ||
1518 | } | ||
1519 | if (query != NULL) | ||
1520 | *query = sb->identifier; | ||
1521 | if (namespace != NULL) | ||
1522 | GNUNET_CRYPTO_hash (&sb->subspace, | ||
1523 | sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), | ||
1524 | namespace); | ||
1525 | return GNUNET_OK; | ||
1753 | } | 1526 | } |
1754 | 1527 | ||
1755 | 1528 | ||
1756 | /** | 1529 | /** |
1757 | * Iterator over entries in the "requests_by_query" map | 1530 | * Transmit the given message by copying it to the target buffer |
1758 | * that frees all the entries. | 1531 | * "buf". "buf" will be NULL and "size" zero if the socket was closed |
1759 | * | 1532 | * for writing in the meantime. In that case, do nothing |
1760 | * @param cls closure, NULL | 1533 | * (the disconnect or shutdown handler will take care of the rest). |
1761 | * @param key current key code (the query, unused) | 1534 | * If we were able to transmit messages and there are still more |
1762 | * @param value value in the hash map, of type "struct PendingRequest*" | 1535 | * pending, ask core again for further calls to this function. |
1763 | * @return GNUNET_YES (we should continue to iterate) | ||
1764 | */ | ||
1765 | static int | ||
1766 | destroy_pending_request_cb (void *cls, | ||
1767 | const GNUNET_HashCode * key, | ||
1768 | void *value) | ||
1769 | { | ||
1770 | struct PendingRequest *pr = value; | ||
1771 | |||
1772 | destroy_pending_request (pr); | ||
1773 | return GNUNET_YES; | ||
1774 | } | ||
1775 | |||
1776 | |||
1777 | /** | ||
1778 | * Task run during shutdown. | ||
1779 | * | 1536 | * |
1780 | * @param cls unused | 1537 | * @param cls closure, pointer to the 'struct ClientList*' |
1781 | * @param tc unused | 1538 | * @param size number of bytes available in buf |
1539 | * @param buf where the callee should write the message | ||
1540 | * @return number of bytes written to buf | ||
1782 | */ | 1541 | */ |
1783 | static void | 1542 | static size_t |
1784 | shutdown_task (void *cls, | 1543 | transmit_to_client (void *cls, |
1785 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 1544 | size_t size, void *buf) |
1786 | { | 1545 | { |
1787 | if (NULL != core) | 1546 | struct ClientList *cl = cls; |
1788 | { | 1547 | char *cbuf = buf; |
1789 | GNUNET_CORE_disconnect (core); | 1548 | struct ClientResponseMessage *creply; |
1790 | core = NULL; | 1549 | size_t msize; |
1791 | } | 1550 | |
1792 | if (NULL != dsh) | 1551 | cl->th = NULL; |
1552 | if (NULL == buf) | ||
1793 | { | 1553 | { |
1794 | GNUNET_DATASTORE_disconnect (dsh, | 1554 | #if DEBUG_FS |
1795 | GNUNET_NO); | 1555 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1796 | dsh = NULL; | 1556 | "Not sending reply, client communication problem.\n"); |
1557 | #endif | ||
1558 | return 0; | ||
1797 | } | 1559 | } |
1798 | GNUNET_CONTAINER_multihashmap_iterate (requests_by_query, | 1560 | msize = 0; |
1799 | &destroy_pending_request_cb, | 1561 | while ( (NULL != (creply = cl->res_head) ) && |
1800 | NULL); | 1562 | (creply->msize <= size) ) |
1801 | while (clients != NULL) | 1563 | { |
1802 | handle_client_disconnect (NULL, | 1564 | memcpy (&cbuf[msize], &creply[1], creply->msize); |
1803 | clients->client); | 1565 | msize += creply->msize; |
1804 | GNUNET_CONTAINER_multihashmap_destroy (requests_by_query); | 1566 | size -= creply->msize; |
1805 | requests_by_query = NULL; | 1567 | GNUNET_CONTAINER_DLL_remove (cl->res_head, |
1806 | GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer); | 1568 | cl->res_tail, |
1807 | requests_by_peer = NULL; | 1569 | creply); |
1808 | GNUNET_CONTAINER_heap_destroy (requests_by_expiration); | 1570 | GNUNET_free (creply); |
1809 | requests_by_expiration = NULL; | 1571 | } |
1810 | // FIXME: iterate over entries and free individually? | 1572 | if (NULL != creply) |
1811 | // (or do we get disconnect notifications?) | 1573 | cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client, |
1812 | GNUNET_CONTAINER_multihashmap_destroy (connected_peers); | 1574 | creply->msize, |
1813 | connected_peers = NULL; | 1575 | GNUNET_TIME_UNIT_FOREVER_REL, |
1576 | &transmit_to_client, | ||
1577 | cl); | ||
1578 | return msize; | ||
1814 | } | 1579 | } |
1815 | 1580 | ||
1816 | 1581 | ||
1817 | /** | 1582 | /** |
1818 | * Free (each) request made by the peer. | 1583 | * Closure for "process_reply" function. |
1819 | * | ||
1820 | * @param cls closure, points to peer that the request belongs to | ||
1821 | * @param key current key code | ||
1822 | * @param value value in the hash map | ||
1823 | * @return GNUNET_YES (we should continue to iterate) | ||
1824 | */ | 1584 | */ |
1825 | static int | 1585 | struct ProcessReplyClosure |
1826 | destroy_request (void *cls, | ||
1827 | const GNUNET_HashCode * key, | ||
1828 | void *value) | ||
1829 | { | 1586 | { |
1830 | const struct GNUNET_PeerIdentity * peer = cls; | 1587 | /** |
1831 | struct PendingRequest *pr = value; | 1588 | * The data for the reply. |
1832 | 1589 | */ | |
1833 | GNUNET_CONTAINER_multihashmap_remove (requests_by_peer, | 1590 | const void *data; |
1834 | &peer->hashPubKey, | ||
1835 | pr); | ||
1836 | destroy_pending_request (pr); | ||
1837 | return GNUNET_YES; | ||
1838 | } | ||
1839 | 1591 | ||
1592 | // FIXME: add 'struct ConnectedPeer' to track 'last_xxx_replies' here! | ||
1840 | 1593 | ||
1594 | /** | ||
1595 | * When the reply expires. | ||
1596 | */ | ||
1597 | struct GNUNET_TIME_Absolute expiration; | ||
1841 | 1598 | ||
1842 | /** | 1599 | /** |
1843 | * Method called whenever a given peer connects. | 1600 | * Size of data. |
1844 | * | 1601 | */ |
1845 | * @param cls closure, not used | 1602 | size_t size; |
1846 | * @param peer peer identity this notification is about | ||
1847 | * @param latency reported latency of the connection with 'other' | ||
1848 | * @param distance reported distance (DV) to 'other' | ||
1849 | */ | ||
1850 | static void | ||
1851 | peer_connect_handler (void *cls, | ||
1852 | const struct | ||
1853 | GNUNET_PeerIdentity * peer, | ||
1854 | struct GNUNET_TIME_Relative latency, | ||
1855 | uint32_t distance) | ||
1856 | { | ||
1857 | struct ConnectedPeer *cp; | ||
1858 | 1603 | ||
1859 | cp = GNUNET_malloc (sizeof (struct ConnectedPeer)); | 1604 | /** |
1860 | cp->pid = GNUNET_PEER_intern (peer); | 1605 | * Namespace that this reply belongs to |
1861 | GNUNET_CONTAINER_multihashmap_put (connected_peers, | 1606 | * (if it is of type SBLOCK). |
1862 | &peer->hashPubKey, | 1607 | */ |
1863 | cp, | 1608 | GNUNET_HashCode namespace; |
1864 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | 1609 | |
1865 | } | 1610 | /** |
1611 | * Type of the block. | ||
1612 | */ | ||
1613 | uint32_t type; | ||
1614 | |||
1615 | /** | ||
1616 | * How much was this reply worth to us? | ||
1617 | */ | ||
1618 | uint32_t priority; | ||
1619 | }; | ||
1866 | 1620 | ||
1867 | 1621 | ||
1868 | /** | 1622 | /** |
1869 | * Method called whenever a peer disconnects. | 1623 | * We have received a reply; handle it! |
1870 | * | 1624 | * |
1871 | * @param cls closure, not used | 1625 | * @param cls response (struct ProcessReplyClosure) |
1872 | * @param peer peer identity this notification is about | 1626 | * @param key our query |
1627 | * @param value value in the hash map (info about the query) | ||
1628 | * @return GNUNET_YES (we should continue to iterate) | ||
1873 | */ | 1629 | */ |
1874 | static void | 1630 | static int |
1875 | peer_disconnect_handler (void *cls, | 1631 | process_reply (void *cls, |
1876 | const struct | 1632 | const GNUNET_HashCode * key, |
1877 | GNUNET_PeerIdentity * peer) | 1633 | void *value) |
1878 | { | 1634 | { |
1635 | struct ProcessReplyClosure *prq = cls; | ||
1636 | struct PendingRequest *pr = value; | ||
1637 | struct PendingMessage *reply; | ||
1638 | struct ClientResponseMessage *creply; | ||
1639 | struct ClientList *cl; | ||
1640 | struct PutMessage *pm; | ||
1641 | struct ContentMessage *cm; | ||
1879 | struct ConnectedPeer *cp; | 1642 | struct ConnectedPeer *cp; |
1880 | struct PendingMessage *pm; | 1643 | GNUNET_HashCode chash; |
1644 | GNUNET_HashCode mhash; | ||
1645 | size_t msize; | ||
1646 | uint32_t prio; | ||
1881 | 1647 | ||
1882 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | 1648 | |
1883 | &peer->hashPubKey); | 1649 | GNUNET_CRYPTO_hash (prq->data, |
1884 | if (cp != NULL) | 1650 | prq->size, |
1651 | &chash); | ||
1652 | switch (prq->type) | ||
1885 | { | 1653 | { |
1886 | GNUNET_PEER_change_rc (cp->pid, -1); | 1654 | case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: |
1887 | GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); | 1655 | case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: |
1888 | if (NULL != cp->cth) | 1656 | /* only possible reply, stop requesting! */ |
1889 | GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); | 1657 | while (NULL != pr->pending_head) |
1890 | while (NULL != (pm = cp->pending_messages)) | 1658 | destroy_pending_message_list_entry (pr->pending_head); |
1659 | GNUNET_break (GNUNET_YES == | ||
1660 | GNUNET_CONTAINER_multihashmap_remove (query_request_map, | ||
1661 | key, | ||
1662 | prq)); | ||
1663 | break; | ||
1664 | case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: | ||
1665 | if (0 != memcmp (pr->namespace, | ||
1666 | &prq->namespace, | ||
1667 | sizeof (GNUNET_HashCode))) | ||
1668 | return GNUNET_YES; /* wrong namespace */ | ||
1669 | /* then: fall-through! */ | ||
1670 | case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: | ||
1671 | if (pr->bf != NULL) | ||
1891 | { | 1672 | { |
1892 | cp->pending_messages = pm->next; | 1673 | mingle_hash (&chash, pr->mingle, &mhash); |
1893 | GNUNET_free (pm); | 1674 | if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf, |
1675 | &mhash)) | ||
1676 | return GNUNET_YES; /* duplicate */ | ||
1677 | GNUNET_CONTAINER_bloomfilter_add (pr->bf, | ||
1678 | &mhash); | ||
1894 | } | 1679 | } |
1895 | GNUNET_free (cp); | 1680 | if (pr->client_request_list != NULL) |
1681 | { | ||
1682 | cl = pr->client_request_list->client_list; | ||
1683 | if (pr->replies_seen_size == pr->replies_seen_off) | ||
1684 | { | ||
1685 | GNUNET_array_grow (pr->replies_seen, | ||
1686 | pr->replies_seen_size, | ||
1687 | pr->replies_seen_size * 2 + 4); | ||
1688 | // FIXME: recalculate BF! | ||
1689 | } | ||
1690 | pr->replies_seen[pr->replies_seen_off++] = chash; | ||
1691 | } | ||
1692 | break; | ||
1693 | case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK: | ||
1694 | // FIXME: any checks against duplicates for SKBlocks? | ||
1695 | break; | ||
1696 | default: | ||
1697 | GNUNET_break (0); | ||
1698 | return GNUNET_YES; | ||
1896 | } | 1699 | } |
1897 | GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer, | 1700 | prio = pr->priority; |
1898 | &peer->hashPubKey, | 1701 | prq->priority += pr->remaining_priority; |
1899 | &destroy_request, | 1702 | pr->remaining_priority = 0; |
1900 | (void*) peer); | 1703 | if (pr->client_request_list != NULL) |
1704 | { | ||
1705 | msize = sizeof (struct PutMessage) + prq->size; | ||
1706 | creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage)); | ||
1707 | creply->msize = msize; | ||
1708 | creply->client_list = cl; | ||
1709 | GNUNET_CONTAINER_DLL_insert_after (cl->res_head, | ||
1710 | cl->res_tail, | ||
1711 | cl->res_tail, | ||
1712 | creply); | ||
1713 | pm = (struct PutMessage*) &creply[1]; | ||
1714 | pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); | ||
1715 | pm->header.size = htons (msize); | ||
1716 | pm->type = htonl (prq->type); | ||
1717 | pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration)); | ||
1718 | memcpy (&creply[1], prq->data, prq->size); | ||
1719 | if (NULL == cl->th) | ||
1720 | cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client, | ||
1721 | msize, | ||
1722 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1723 | &transmit_to_client, | ||
1724 | cl); | ||
1725 | GNUNET_break (cl->th != NULL); | ||
1726 | } | ||
1727 | else | ||
1728 | { | ||
1729 | cp = pr->pht_entry->cp; | ||
1730 | msize = sizeof (struct ContentMessage) + prq->size; | ||
1731 | reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); | ||
1732 | reply->cont = &transmit_reply_continuation; | ||
1733 | reply->cont_cls = pr; | ||
1734 | reply->msize = msize; | ||
1735 | reply->priority = (uint32_t) -1; /* send replies first! */ | ||
1736 | cm = (struct ContentMessage*) &reply[1]; | ||
1737 | cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT); | ||
1738 | cm->header.size = htons (msize); | ||
1739 | cm->type = htonl (prq->type); | ||
1740 | cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); | ||
1741 | memcpy (&reply[1], prq->data, prq->size); | ||
1742 | add_to_pending_messages_for_peer (cp, reply, pr); | ||
1743 | } | ||
1744 | |||
1745 | |||
1746 | // FIXME: implement hot-path routing statistics keeping! | ||
1747 | return GNUNET_YES; | ||
1901 | } | 1748 | } |
1902 | 1749 | ||
1903 | 1750 | ||
1904 | /** | 1751 | /** |
1905 | * We're processing a GET request from another peer and have decided | 1752 | * Handle P2P "PUT" message. |
1906 | * to forward it to other peers. | ||
1907 | * | 1753 | * |
1908 | * @param cls our "struct ProcessGetContext *" | 1754 | * @param cls closure, always NULL |
1909 | * @param tc unused | 1755 | * @param other the other peer involved (sender or receiver, NULL |
1756 | * for loopback messages where we are both sender and receiver) | ||
1757 | * @param message the actual message | ||
1758 | * @param latency reported latency of the connection with 'other' | ||
1759 | * @param distance reported distance (DV) to 'other' | ||
1760 | * @return GNUNET_OK to keep the connection open, | ||
1761 | * GNUNET_SYSERR to close it (signal serious error) | ||
1910 | */ | 1762 | */ |
1911 | static void | 1763 | static int |
1912 | forward_get_request (void *cls, | 1764 | handle_p2p_put (void *cls, |
1913 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 1765 | const struct GNUNET_PeerIdentity *other, |
1766 | const struct GNUNET_MessageHeader *message, | ||
1767 | struct GNUNET_TIME_Relative latency, | ||
1768 | uint32_t distance) | ||
1914 | { | 1769 | { |
1915 | struct ProcessGetContext *pgc = cls; | 1770 | const struct PutMessage *put; |
1916 | struct PendingRequest *pr; | 1771 | uint16_t msize; |
1917 | struct PendingRequest *eer; | 1772 | size_t dsize; |
1918 | struct GNUNET_PeerIdentity target; | 1773 | uint32_t type; |
1774 | struct GNUNET_TIME_Absolute expiration; | ||
1775 | GNUNET_HashCode query; | ||
1776 | struct ProcessReplyClosure prq; | ||
1919 | 1777 | ||
1920 | pr = GNUNET_malloc (sizeof (struct PendingRequest)); | 1778 | msize = ntohs (message->size); |
1921 | if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm)) | 1779 | if (msize < sizeof (struct PutMessage)) |
1922 | { | ||
1923 | pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode)); | ||
1924 | *pr->namespace = pgc->namespace; | ||
1925 | } | ||
1926 | pr->bf = pgc->bf; | ||
1927 | pr->bf_size = pgc->bf_size; | ||
1928 | pgc->bf = NULL; | ||
1929 | pr->start_time = pgc->start_time; | ||
1930 | pr->query = pgc->query; | ||
1931 | pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to); | ||
1932 | if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm)) | ||
1933 | pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target); | ||
1934 | pr->anonymity_level = 1; /* default */ | ||
1935 | pr->priority = pgc->priority; | ||
1936 | pr->remaining_priority = pr->priority; | ||
1937 | pr->mingle = pgc->mingle; | ||
1938 | pr->ttl = pgc->ttl; | ||
1939 | pr->type = pgc->type; | ||
1940 | GNUNET_CONTAINER_multihashmap_put (requests_by_query, | ||
1941 | &pr->query, | ||
1942 | pr, | ||
1943 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
1944 | GNUNET_CONTAINER_multihashmap_put (requests_by_peer, | ||
1945 | &pgc->reply_to.hashPubKey, | ||
1946 | pr, | ||
1947 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
1948 | pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration, | ||
1949 | pr, | ||
1950 | pr->start_time.value + pr->ttl); | ||
1951 | if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests) | ||
1952 | { | 1780 | { |
1953 | /* expire oldest request! */ | 1781 | GNUNET_break_op(0); |
1954 | eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration); | 1782 | return GNUNET_SYSERR; |
1955 | GNUNET_PEER_resolve (eer->source_pid, | ||
1956 | &target); | ||
1957 | GNUNET_CONTAINER_multihashmap_remove (requests_by_peer, | ||
1958 | &target.hashPubKey, | ||
1959 | eer); | ||
1960 | destroy_pending_request (eer); | ||
1961 | } | 1783 | } |
1962 | pr->task = GNUNET_SCHEDULER_add_delayed (sched, | 1784 | put = (const struct PutMessage*) message; |
1963 | get_processing_delay (), | 1785 | dsize = msize - sizeof (struct PutMessage); |
1964 | &forward_request_task, | 1786 | type = ntohl (put->type); |
1965 | pr); | 1787 | expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration)); |
1966 | GNUNET_free (pgc); | ||
1967 | } | ||
1968 | /** | ||
1969 | * Transmit the given message by copying it to | ||
1970 | * the target buffer "buf". "buf" will be | ||
1971 | * NULL and "size" zero if the socket was closed for | ||
1972 | * writing in the meantime. In that case, only | ||
1973 | 1788 | ||
1974 | * free the message | 1789 | /* first, validate! */ |
1975 | * | 1790 | switch (type) |
1976 | * @param cls closure, pointer to the message | ||
1977 | * @param size number of bytes available in buf | ||
1978 | * @param buf where the callee should write the message | ||
1979 | * @return number of bytes written to buf | ||
1980 | */ | ||
1981 | static size_t | ||
1982 | transmit_message (void *cls, | ||
1983 | size_t size, void *buf) | ||
1984 | { | ||
1985 | struct GNUNET_MessageHeader *msg = cls; | ||
1986 | uint16_t msize; | ||
1987 | |||
1988 | if (NULL == buf) | ||
1989 | { | 1791 | { |
1990 | #if DEBUG_FS | 1792 | case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: |
1991 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1793 | case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: |
1992 | "Dropping reply, core too busy.\n"); | 1794 | GNUNET_CRYPTO_hash (&put[1], dsize, &query); |
1993 | #endif | 1795 | break; |
1994 | GNUNET_free (msg); | 1796 | case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: |
1995 | return 0; | 1797 | if (GNUNET_OK != |
1798 | check_kblock ((const struct KBlock*) &put[1], | ||
1799 | dsize, | ||
1800 | &query)) | ||
1801 | return GNUNET_SYSERR; | ||
1802 | break; | ||
1803 | case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: | ||
1804 | if (GNUNET_OK != | ||
1805 | check_sblock ((const struct SBlock*) &put[1], | ||
1806 | dsize, | ||
1807 | &query, | ||
1808 | &prq.namespace)) | ||
1809 | return GNUNET_SYSERR; | ||
1810 | break; | ||
1811 | case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK: | ||
1812 | // FIXME -- validate SKBLOCK! | ||
1813 | GNUNET_break (0); | ||
1814 | return GNUNET_OK; | ||
1815 | default: | ||
1816 | /* unknown block type */ | ||
1817 | GNUNET_break_op (0); | ||
1818 | return GNUNET_SYSERR; | ||
1996 | } | 1819 | } |
1997 | msize = ntohs (msg->size); | 1820 | |
1998 | GNUNET_assert (size >= msize); | 1821 | /* now, lookup 'query' */ |
1999 | memcpy (buf, msg, msize); | 1822 | prq.data = (const void*) &put[1]; |
2000 | GNUNET_free (msg); | 1823 | prq.size = dsize; |
2001 | return msize; | 1824 | prq.type = type; |
1825 | prq.expiration = expiration; | ||
1826 | prq.priority = 0; | ||
1827 | GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, | ||
1828 | &query, | ||
1829 | &process_reply, | ||
1830 | &prq); | ||
1831 | // FIXME: if migration is on and load is low, | ||
1832 | // queue to store data in datastore; | ||
1833 | // use "prq.priority" for that! | ||
1834 | return GNUNET_OK; | ||
2002 | } | 1835 | } |
2003 | 1836 | ||
2004 | 1837 | ||
2005 | /** | 1838 | /* **************************** P2P GET Handling ************************ */ |
2006 | * Test if the load on this peer is too high | ||
2007 | * to even consider processing the query at | ||
2008 | * all. | ||
2009 | * | ||
2010 | * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise | ||
2011 | */ | ||
2012 | static int | ||
2013 | test_load_too_high () | ||
2014 | { | ||
2015 | return GNUNET_NO; // FIXME | ||
2016 | } | ||
2017 | 1839 | ||
2018 | 1840 | ||
2019 | /** | 1841 | /** |
@@ -2034,142 +1856,101 @@ test_load_too_high () | |||
2034 | * maybe 0 if no unique identifier is available | 1856 | * maybe 0 if no unique identifier is available |
2035 | */ | 1857 | */ |
2036 | static void | 1858 | static void |
2037 | process_p2p_get_result (void *cls, | 1859 | process_local_reply (void *cls, |
2038 | const GNUNET_HashCode * key, | 1860 | const GNUNET_HashCode * key, |
2039 | uint32_t size, | 1861 | uint32_t size, |
2040 | const void *data, | 1862 | const void *data, |
2041 | uint32_t type, | 1863 | uint32_t type, |
2042 | uint32_t priority, | 1864 | uint32_t priority, |
2043 | uint32_t anonymity, | 1865 | uint32_t anonymity, |
2044 | struct GNUNET_TIME_Absolute | 1866 | struct GNUNET_TIME_Absolute |
2045 | expiration, | 1867 | expiration, |
2046 | uint64_t uid) | 1868 | uint64_t uid) |
2047 | { | 1869 | { |
2048 | struct ProcessGetContext *pgc = cls; | 1870 | struct PendingRequest *pr = cls; |
1871 | struct ProcessReplyClosure prq; | ||
2049 | GNUNET_HashCode dhash; | 1872 | GNUNET_HashCode dhash; |
2050 | GNUNET_HashCode mhash; | 1873 | GNUNET_HashCode mhash; |
2051 | struct PutMessage *reply; | 1874 | GNUNET_HashCode query; |
2052 | 1875 | ||
1876 | pr->drq = NULL; | ||
2053 | if (NULL == key) | 1877 | if (NULL == key) |
2054 | { | 1878 | { |
2055 | /* no more results */ | 1879 | /* no more results */ |
2056 | if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) == ROUTING_POLICY_FORWARD) && | 1880 | if (pr->task == GNUNET_SCHEDULER_NO_TASK) |
2057 | ( (0 == pgc->results_found) || | 1881 | pr->task = GNUNET_SCHEDULER_add_now (sched, |
2058 | (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) || | 1882 | &forward_request_task, |
2059 | (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) || | 1883 | pr); |
2060 | (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) ) | ||
2061 | { | ||
2062 | GNUNET_SCHEDULER_add_continuation (sched, | ||
2063 | &forward_get_request, | ||
2064 | pgc, | ||
2065 | GNUNET_SCHEDULER_REASON_PREREQ_DONE); | ||
2066 | } | ||
2067 | else | ||
2068 | { | ||
2069 | if (pgc->bf != NULL) | ||
2070 | GNUNET_CONTAINER_bloomfilter_free (pgc->bf); | ||
2071 | GNUNET_free (pgc); | ||
2072 | } | ||
2073 | next_ds_request (); | ||
2074 | return; | 1884 | return; |
2075 | } | 1885 | } |
2076 | if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND) | 1886 | if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND) |
2077 | { | 1887 | { |
2078 | GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, | 1888 | if (GNUNET_OK != |
2079 | anonymity, expiration, uid, dsh, | 1889 | GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, |
2080 | &process_p2p_get_result, | 1890 | anonymity, expiration, uid, |
2081 | pgc); | 1891 | &process_local_reply, |
1892 | pr)) | ||
1893 | GNUNET_FS_drq_get_next (GNUNET_YES); | ||
2082 | return; | 1894 | return; |
2083 | } | 1895 | } |
2084 | /* check for duplicates */ | 1896 | /* check for duplicates */ |
2085 | GNUNET_CRYPTO_hash (data, size, &dhash); | 1897 | GNUNET_CRYPTO_hash (data, size, &dhash); |
2086 | mingle_hash (&dhash, | 1898 | mingle_hash (&dhash, |
2087 | pgc->mingle, | 1899 | pr->mingle, |
2088 | &mhash); | 1900 | &mhash); |
2089 | if ( (pgc->bf != NULL) && | 1901 | if ( (pr->bf != NULL) && |
2090 | (GNUNET_YES == | 1902 | (GNUNET_YES == |
2091 | GNUNET_CONTAINER_bloomfilter_test (pgc->bf, | 1903 | GNUNET_CONTAINER_bloomfilter_test (pr->bf, |
2092 | &mhash)) ) | 1904 | &mhash)) ) |
2093 | { | 1905 | { |
2094 | #if DEBUG_FS | 1906 | #if DEBUG_FS |
2095 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1907 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2096 | "Result from datastore filtered by bloomfilter.\n"); | 1908 | "Result from datastore filtered by bloomfilter.\n"); |
2097 | #endif | 1909 | #endif |
2098 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | 1910 | GNUNET_FS_drq_get_next (GNUNET_YES); |
2099 | return; | 1911 | return; |
2100 | } | 1912 | } |
2101 | pgc->results_found++; | 1913 | pr->results_found++; |
2102 | if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) || | 1914 | if ( (pr->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) || |
2103 | (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) || | 1915 | (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) || |
2104 | (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) | 1916 | (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) |
2105 | { | 1917 | { |
2106 | if (pgc->bf == NULL) | 1918 | if (pr->bf == NULL) |
2107 | { | 1919 | { |
2108 | pgc->bf_size = 32; | 1920 | pr->bf_size = 32; |
2109 | pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, | 1921 | pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, |
2110 | pgc->bf_size, | 1922 | pr->bf_size, |
2111 | BLOOMFILTER_K); | 1923 | BLOOMFILTER_K); |
2112 | } | 1924 | } |
2113 | GNUNET_CONTAINER_bloomfilter_add (pgc->bf, | 1925 | GNUNET_CONTAINER_bloomfilter_add (pr->bf, |
2114 | &mhash); | 1926 | &mhash); |
2115 | } | 1927 | } |
2116 | 1928 | memset (&prq, 0, sizeof (prq)); | |
2117 | reply = GNUNET_malloc (sizeof (struct PutMessage) + size); | 1929 | prq.data = data; |
2118 | reply->header.size = htons (sizeof (struct PutMessage) + size); | 1930 | prq.expiration = expiration; |
2119 | reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); | 1931 | prq.size = size; |
2120 | reply->type = htonl (type); | 1932 | if ( (type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) && |
2121 | reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration)); | 1933 | (GNUNET_OK != check_sblock ((const struct SBlock*) data, |
2122 | memcpy (&reply[1], data, size); | 1934 | size, |
2123 | GNUNET_CORE_notify_transmit_ready (core, | 1935 | &query, |
2124 | pgc->priority, | 1936 | &prq.namespace)) ) |
2125 | ACCEPTABLE_REPLY_DELAY, | ||
2126 | &pgc->reply_to, | ||
2127 | sizeof (struct PutMessage) + size, | ||
2128 | &transmit_message, | ||
2129 | reply); | ||
2130 | if ( (GNUNET_YES == test_load_too_high()) || | ||
2131 | (pgc->results_found > 5 + 2 * pgc->priority) ) | ||
2132 | { | 1937 | { |
2133 | GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); | 1938 | GNUNET_break (0); |
2134 | pgc->policy &= ~ ROUTING_POLICY_FORWARD; | 1939 | /* FIXME: consider removing the block? */ |
1940 | GNUNET_FS_drq_get_next (GNUNET_YES); | ||
2135 | return; | 1941 | return; |
2136 | } | 1942 | } |
2137 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | 1943 | prq.type = type; |
2138 | } | 1944 | prq.priority = priority; |
1945 | process_reply (&prq, key, pr); | ||
2139 | 1946 | ||
2140 | 1947 | if ( (GNUNET_YES == test_load_too_high()) || | |
2141 | /** | 1948 | (pr->results_found > 5 + 2 * pr->priority) ) |
2142 | * We're processing a GET request from another peer. Give it to our | ||
2143 | * local datastore. | ||
2144 | * | ||
2145 | * @param cls our "struct ProcessGetContext" | ||
2146 | * @param ok did we get a datastore slice or not? | ||
2147 | */ | ||
2148 | static void | ||
2149 | ds_get_request (void *cls, | ||
2150 | int ok) | ||
2151 | { | ||
2152 | struct ProcessGetContext *pgc = cls; | ||
2153 | uint32_t type; | ||
2154 | struct GNUNET_TIME_Relative timeout; | ||
2155 | |||
2156 | if (GNUNET_OK != ok) | ||
2157 | { | 1949 | { |
2158 | /* no point in doing P2P stuff if we can't even do local */ | 1950 | GNUNET_FS_drq_get_next (GNUNET_NO); |
2159 | GNUNET_free (dsh); | ||
2160 | return; | 1951 | return; |
2161 | } | 1952 | } |
2162 | type = pgc->type; | 1953 | GNUNET_FS_drq_get_next (GNUNET_YES); |
2163 | if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK) | ||
2164 | type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */ | ||
2165 | timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY, | ||
2166 | (pgc->priority + 1)); | ||
2167 | GNUNET_DATASTORE_get (dsh, | ||
2168 | &pgc->query, | ||
2169 | type, | ||
2170 | &process_p2p_get_result, | ||
2171 | pgc, | ||
2172 | timeout); | ||
2173 | } | 1954 | } |
2174 | 1955 | ||
2175 | 1956 | ||
@@ -2201,17 +1982,16 @@ bound_ttl (int32_t ttl_in, uint32_t prio) | |||
2201 | 1982 | ||
2202 | 1983 | ||
2203 | /** | 1984 | /** |
2204 | * We've received a request with the specified | 1985 | * We've received a request with the specified priority. Bound it |
2205 | * priority. Bound it according to how much | 1986 | * according to how much we trust the given peer. |
2206 | * we trust the given peer. | ||
2207 | * | 1987 | * |
2208 | * @param prio_in requested priority | 1988 | * @param prio_in requested priority |
2209 | * @param peer the peer making the request | 1989 | * @param cp the peer making the request |
2210 | * @return effective priority | 1990 | * @return effective priority |
2211 | */ | 1991 | */ |
2212 | static uint32_t | 1992 | static uint32_t |
2213 | bound_priority (uint32_t prio_in, | 1993 | bound_priority (uint32_t prio_in, |
2214 | const struct GNUNET_PeerIdentity *peer) | 1994 | struct ConnectedPeer *cp) |
2215 | { | 1995 | { |
2216 | return 0; // FIXME! | 1996 | return 0; // FIXME! |
2217 | } | 1997 | } |
@@ -2233,20 +2013,23 @@ static int | |||
2233 | handle_p2p_get (void *cls, | 2013 | handle_p2p_get (void *cls, |
2234 | const struct GNUNET_PeerIdentity *other, | 2014 | const struct GNUNET_PeerIdentity *other, |
2235 | const struct GNUNET_MessageHeader *message, | 2015 | const struct GNUNET_MessageHeader *message, |
2236 | struct GNUNET_TIME_Relative latency, | 2016 | struct GNUNET_TIME_Relative latency, |
2237 | uint32_t distance) | 2017 | uint32_t distance) |
2238 | { | 2018 | { |
2019 | struct PendingRequest *pr; | ||
2020 | struct PeerRequestEntry *pre; | ||
2021 | struct ConnectedPeer *cp; | ||
2022 | struct ConnectedPeer *cps; | ||
2023 | struct GNUNET_TIME_Relative timeout; | ||
2239 | uint16_t msize; | 2024 | uint16_t msize; |
2240 | const struct GetMessage *gm; | 2025 | const struct GetMessage *gm; |
2241 | unsigned int bits; | 2026 | unsigned int bits; |
2242 | const GNUNET_HashCode *opt; | 2027 | const GNUNET_HashCode *opt; |
2243 | struct ProcessGetContext *pgc; | ||
2244 | uint32_t bm; | 2028 | uint32_t bm; |
2245 | size_t bfsize; | 2029 | size_t bfsize; |
2246 | uint32_t ttl_decrement; | 2030 | uint32_t ttl_decrement; |
2031 | uint32_t type; | ||
2247 | double preference; | 2032 | double preference; |
2248 | int net_load_up; | ||
2249 | int net_load_down; | ||
2250 | 2033 | ||
2251 | msize = ntohs(message->size); | 2034 | msize = ntohs(message->size); |
2252 | if (msize < sizeof (struct GetMessage)) | 2035 | if (msize < sizeof (struct GetMessage)) |
@@ -2270,40 +2053,33 @@ handle_p2p_get (void *cls, | |||
2270 | } | 2053 | } |
2271 | opt = (const GNUNET_HashCode*) &gm[1]; | 2054 | opt = (const GNUNET_HashCode*) &gm[1]; |
2272 | bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode); | 2055 | bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode); |
2273 | pgc = GNUNET_malloc (sizeof (struct ProcessGetContext)); | 2056 | |
2274 | if (bfsize > 0) | 2057 | bm = ntohl (gm->hash_bitmap); |
2058 | if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) && | ||
2059 | (ntohl (gm->type) == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ) | ||
2275 | { | 2060 | { |
2276 | pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1], | 2061 | GNUNET_break_op (0); |
2277 | bfsize, | 2062 | return GNUNET_SYSERR; |
2278 | BLOOMFILTER_K); | ||
2279 | pgc->bf_size = bfsize; | ||
2280 | } | 2063 | } |
2281 | pgc->type = ntohl (gm->type); | ||
2282 | pgc->bm = ntohl (gm->hash_bitmap); | ||
2283 | pgc->mingle = gm->filter_mutator; | ||
2284 | bits = 0; | 2064 | bits = 0; |
2285 | if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO)) | 2065 | cps = GNUNET_CONTAINER_multihashmap_get (connected_peers, |
2286 | pgc->reply_to.hashPubKey = opt[bits++]; | 2066 | &other->hashPubKey); |
2067 | GNUNET_assert (NULL != cps); | ||
2068 | if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) | ||
2069 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
2070 | &opt[bits++]); | ||
2287 | else | 2071 | else |
2288 | pgc->reply_to = *other; | 2072 | cp = cps; |
2289 | if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) | 2073 | if (cp == NULL) |
2290 | pgc->namespace = opt[bits++]; | ||
2291 | else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) | ||
2292 | { | 2074 | { |
2293 | GNUNET_break_op (0); | 2075 | /* FIXME: try connect? */ |
2294 | GNUNET_free (pgc); | 2076 | return GNUNET_OK; |
2295 | return GNUNET_SYSERR; | ||
2296 | } | 2077 | } |
2297 | if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO)) | ||
2298 | pgc->prime_target.hashPubKey = opt[bits++]; | ||
2299 | /* note that we can really only check load here since otherwise | 2078 | /* note that we can really only check load here since otherwise |
2300 | peers could find out that we are overloaded by being disconnected | 2079 | peers could find out that we are overloaded by not being |
2301 | after sending us a malformed query... */ | 2080 | disconnected after sending us a malformed query... */ |
2302 | if (GNUNET_YES == test_load_too_high ()) | 2081 | if (GNUNET_YES == test_load_too_high ()) |
2303 | { | 2082 | { |
2304 | if (NULL != pgc->bf) | ||
2305 | GNUNET_CONTAINER_bloomfilter_free (pgc->bf); | ||
2306 | GNUNET_free (pgc); | ||
2307 | #if DEBUG_FS | 2083 | #if DEBUG_FS |
2308 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2084 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
2309 | "Dropping query from `%s', this peer is too busy.\n", | 2085 | "Dropping query from `%s', this peer is too busy.\n", |
@@ -2311,55 +2087,34 @@ handle_p2p_get (void *cls, | |||
2311 | #endif | 2087 | #endif |
2312 | return GNUNET_OK; | 2088 | return GNUNET_OK; |
2313 | } | 2089 | } |
2314 | net_load_up = 50; // FIXME | 2090 | |
2315 | net_load_down = 50; // FIXME | 2091 | pr = GNUNET_malloc (sizeof (struct PendingRequest) + |
2316 | pgc->policy = ROUTING_POLICY_NONE; | 2092 | (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)?sizeof(GNUNET_HashCode):0); |
2317 | if ( (net_load_up < IDLE_LOAD_THRESHOLD) && | 2093 | if ((bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) |
2318 | (net_load_down < IDLE_LOAD_THRESHOLD) ) | 2094 | pr->namespace = (GNUNET_HashCode*) &pr[1]; |
2319 | { | 2095 | pr->type = ntohl (gm->type); |
2320 | pgc->policy |= ROUTING_POLICY_ALL; | 2096 | pr->mingle = gm->filter_mutator; |
2321 | pgc->priority = 0; /* no charge */ | 2097 | if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) |
2322 | } | 2098 | memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode)); |
2323 | else | 2099 | else if (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) |
2324 | { | ||
2325 | pgc->priority = bound_priority (ntohl (gm->priority), other); | ||
2326 | if ( (net_load_up < | ||
2327 | IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) && | ||
2328 | (net_load_down < | ||
2329 | IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) ) | ||
2330 | { | ||
2331 | pgc->policy |= ROUTING_POLICY_ALL; | ||
2332 | } | ||
2333 | else | ||
2334 | { | ||
2335 | // FIXME: is this sound? | ||
2336 | if (net_load_up < 90 + 10 * pgc->priority) | ||
2337 | pgc->policy |= ROUTING_POLICY_FORWARD; | ||
2338 | if (net_load_down < 90 + 10 * pgc->priority) | ||
2339 | pgc->policy |= ROUTING_POLICY_ANSWER; | ||
2340 | } | ||
2341 | } | ||
2342 | if (pgc->policy == ROUTING_POLICY_NONE) | ||
2343 | { | 2100 | { |
2344 | #if DEBUG_FS | 2101 | GNUNET_break_op (0); |
2345 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2102 | GNUNET_free (pr); |
2346 | "Dropping query from `%s', network saturated.\n", | 2103 | return GNUNET_SYSERR; |
2347 | GNUNET_i2s (other)); | ||
2348 | #endif | ||
2349 | if (NULL != pgc->bf) | ||
2350 | GNUNET_CONTAINER_bloomfilter_free (pgc->bf); | ||
2351 | GNUNET_free (pgc); | ||
2352 | return GNUNET_OK; /* drop */ | ||
2353 | } | 2104 | } |
2354 | if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT) | 2105 | if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) |
2355 | pgc->priority = 0; /* kill the priority (we cannot benefit) */ | 2106 | pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]); |
2356 | pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority); | 2107 | |
2108 | pr->anonymity_level = 1; | ||
2109 | pr->priority = bound_priority (ntohl (gm->priority), cps); | ||
2110 | pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority); | ||
2111 | pr->query = gm->query; | ||
2357 | /* decrement ttl (always) */ | 2112 | /* decrement ttl (always) */ |
2358 | ttl_decrement = 2 * TTL_DECREMENT + | 2113 | ttl_decrement = 2 * TTL_DECREMENT + |
2359 | GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | 2114 | GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, |
2360 | TTL_DECREMENT); | 2115 | TTL_DECREMENT); |
2361 | if ( (pgc->ttl < 0) && | 2116 | if ( (pr->ttl < 0) && |
2362 | (pgc->ttl - ttl_decrement > 0) ) | 2117 | (pr->ttl - ttl_decrement > 0) ) |
2363 | { | 2118 | { |
2364 | #if DEBUG_FS | 2119 | #if DEBUG_FS |
2365 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 2120 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -2367,400 +2122,177 @@ handle_p2p_get (void *cls, | |||
2367 | GNUNET_i2s (other)); | 2122 | GNUNET_i2s (other)); |
2368 | #endif | 2123 | #endif |
2369 | /* integer underflow => drop (should be very rare)! */ | 2124 | /* integer underflow => drop (should be very rare)! */ |
2370 | if (NULL != pgc->bf) | 2125 | GNUNET_free (pr); |
2371 | GNUNET_CONTAINER_bloomfilter_free (pgc->bf); | ||
2372 | GNUNET_free (pgc); | ||
2373 | return GNUNET_OK; | 2126 | return GNUNET_OK; |
2127 | } | ||
2128 | pr->ttl -= ttl_decrement; | ||
2129 | pr->start_time = GNUNET_TIME_absolute_get (); | ||
2130 | |||
2131 | /* get bloom filter */ | ||
2132 | if (bfsize > 0) | ||
2133 | { | ||
2134 | pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits], | ||
2135 | bfsize, | ||
2136 | BLOOMFILTER_K); | ||
2137 | pr->bf_size = bfsize; | ||
2374 | } | 2138 | } |
2375 | pgc->ttl -= ttl_decrement; | ||
2376 | pgc->start_time = GNUNET_TIME_absolute_get (); | ||
2377 | preference = (double) pgc->priority; | ||
2378 | if (preference < QUERY_BANDWIDTH_VALUE) | ||
2379 | preference = QUERY_BANDWIDTH_VALUE; | ||
2380 | // FIXME: also reserve bandwidth for reply? | ||
2381 | (void) GNUNET_CORE_peer_change_preference (sched, cfg, | ||
2382 | other, | ||
2383 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
2384 | 0, 0, preference, NULL, NULL); | ||
2385 | if (0 != (pgc->policy & ROUTING_POLICY_ANSWER)) | ||
2386 | pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY, | ||
2387 | &ds_get_request, | ||
2388 | pgc); | ||
2389 | else | ||
2390 | GNUNET_SCHEDULER_add_continuation (sched, | ||
2391 | &forward_get_request, | ||
2392 | pgc, | ||
2393 | GNUNET_SCHEDULER_REASON_PREREQ_DONE); | ||
2394 | return GNUNET_OK; | ||
2395 | } | ||
2396 | 2139 | ||
2140 | /* FIXME: check somewhere if request already exists, and if so, | ||
2141 | recycle old state... */ | ||
2142 | pre = GNUNET_malloc (sizeof (struct PeerRequestEntry)); | ||
2143 | pre->cp = cp; | ||
2144 | pre->req = pr; | ||
2145 | GNUNET_CONTAINER_multihashmap_put (query_request_map, | ||
2146 | &gm->query, | ||
2147 | pre, | ||
2148 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
2149 | |||
2150 | pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, | ||
2151 | pr, | ||
2152 | GNUNET_TIME_absolute_get().value + pr->ttl); | ||
2397 | 2153 | ||
2398 | /** | ||
2399 | * Function called to notify us that we can now transmit a reply to a | ||
2400 | * client or peer. "buf" will be NULL and "size" zero if the socket was | ||
2401 | * closed for writing in the meantime. | ||
2402 | * | ||
2403 | * @param cls closure, points to a "struct PendingRequest*" with | ||
2404 | * one or more pending replies | ||
2405 | * @param size number of bytes available in buf | ||
2406 | * @param buf where the callee should write the message | ||
2407 | * @return number of bytes written to buf | ||
2408 | */ | ||
2409 | static size_t | ||
2410 | transmit_result (void *cls, | ||
2411 | size_t size, | ||
2412 | void *buf) | ||
2413 | { | ||
2414 | struct PendingRequest *pr = cls; | ||
2415 | char *cbuf = buf; | ||
2416 | struct PendingMessage *reply; | ||
2417 | size_t ret; | ||
2418 | 2154 | ||
2419 | ret = 0; | 2155 | /* calculate change in traffic preference */ |
2420 | while (NULL != (reply = pr->replies_pending)) | 2156 | preference = (double) pr->priority; |
2421 | { | 2157 | if (preference < QUERY_BANDWIDTH_VALUE) |
2422 | if ( (reply->msize + ret < ret) || | 2158 | preference = QUERY_BANDWIDTH_VALUE; |
2423 | (reply->msize + ret > size) ) | 2159 | cps->inc_preference += preference; |
2424 | break; | ||
2425 | pr->replies_pending = reply->next; | ||
2426 | memcpy (&cbuf[ret], &reply[1], reply->msize); | ||
2427 | ret += reply->msize; | ||
2428 | GNUNET_free (reply); | ||
2429 | } | ||
2430 | return ret; | ||
2431 | } | ||
2432 | 2160 | ||
2161 | /* process locally */ | ||
2162 | type = pr->type; | ||
2163 | if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK) | ||
2164 | type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */ | ||
2165 | timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY, | ||
2166 | (pr->priority + 1)); | ||
2167 | pr->drq = GNUNET_FS_drq_get (&gm->query, | ||
2168 | pr->type, | ||
2169 | &process_local_reply, | ||
2170 | pr, | ||
2171 | timeout); | ||
2433 | 2172 | ||
2434 | /** | 2173 | /* Are multiple results possible? If so, start processing remotely now! */ |
2435 | * We have received a reply; handle it! | 2174 | switch (pr->type) |
2436 | * | ||
2437 | * @param cls response (struct ProcessReplyClosure) | ||
2438 | * @param key our query | ||
2439 | * @param value value in the hash map (meta-info about the query) | ||
2440 | * @return GNUNET_YES (we should continue to iterate) | ||
2441 | */ | ||
2442 | static int | ||
2443 | process_reply (void *cls, | ||
2444 | const GNUNET_HashCode * key, | ||
2445 | void *value) | ||
2446 | { | ||
2447 | struct ProcessReplyClosure *prq = cls; | ||
2448 | struct PendingRequest *pr = value; | ||
2449 | struct PendingRequest *eer; | ||
2450 | struct PendingMessage *reply; | ||
2451 | struct PutMessage *pm; | ||
2452 | struct ContentMessage *cm; | ||
2453 | struct ConnectedPeer *cp; | ||
2454 | GNUNET_HashCode chash; | ||
2455 | GNUNET_HashCode mhash; | ||
2456 | struct GNUNET_PeerIdentity target; | ||
2457 | size_t msize; | ||
2458 | uint32_t prio; | ||
2459 | struct GNUNET_TIME_Relative max_delay; | ||
2460 | |||
2461 | GNUNET_CRYPTO_hash (prq->data, | ||
2462 | prq->size, | ||
2463 | &chash); | ||
2464 | switch (prq->type) | ||
2465 | { | 2175 | { |
2466 | case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: | 2176 | case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: |
2467 | case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: | 2177 | case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: |
2178 | /* only one result, wait for datastore */ | ||
2468 | break; | 2179 | break; |
2469 | case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: | 2180 | default: |
2470 | /* FIXME: does prq->namespace match our expectations? */ | 2181 | pr->task = GNUNET_SCHEDULER_add_now (sched, |
2471 | /* then: fall-through??? */ | 2182 | &forward_request_task, |
2472 | case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: | 2183 | pr); |
2473 | if (pr->bf != NULL) | ||
2474 | { | ||
2475 | mingle_hash (&chash, pr->mingle, &mhash); | ||
2476 | if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf, | ||
2477 | &mhash)) | ||
2478 | return GNUNET_YES; /* duplicate */ | ||
2479 | GNUNET_CONTAINER_bloomfilter_add (pr->bf, | ||
2480 | &mhash); | ||
2481 | } | ||
2482 | break; | ||
2483 | case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK: | ||
2484 | // FIXME: any checks against duplicates for SKBlocks? | ||
2485 | break; | ||
2486 | } | ||
2487 | prio = pr->priority; | ||
2488 | prq->priority += pr->remaining_priority; | ||
2489 | pr->remaining_priority = 0; | ||
2490 | if (pr->client != NULL) | ||
2491 | { | ||
2492 | if (pr->replies_seen_size == pr->replies_seen_off) | ||
2493 | GNUNET_array_grow (pr->replies_seen, | ||
2494 | pr->replies_seen_size, | ||
2495 | pr->replies_seen_size * 2 + 4); | ||
2496 | pr->replies_seen[pr->replies_seen_off++] = chash; | ||
2497 | // FIXME: possibly recalculate BF! | ||
2498 | } | 2184 | } |
2499 | if (pr->client == NULL) | ||
2500 | { | ||
2501 | GNUNET_PEER_resolve (pr->source_pid, | ||
2502 | &target); | ||
2503 | cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, | ||
2504 | &target.hashPubKey); | ||
2505 | msize = sizeof (struct ContentMessage) + prq->size; | ||
2506 | reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); | ||
2507 | reply->msize = msize; | ||
2508 | reply->priority = (uint32_t) -1; /* send replies first! */ | ||
2509 | cm = (struct ContentMessage*) &reply[1]; | ||
2510 | cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT); | ||
2511 | cm->header.size = htons (msize); | ||
2512 | cm->type = htonl (prq->type); | ||
2513 | cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); | ||
2514 | memcpy (&reply[1], prq->data, prq->size); | ||
2515 | max_delay = GNUNET_TIME_UNIT_FOREVER_REL; | ||
2516 | if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests) | ||
2517 | { | ||
2518 | /* estimate expiration time from time difference between | ||
2519 | first request that will be discarded and this request */ | ||
2520 | eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration); | ||
2521 | max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time, | ||
2522 | eer->start_time); | ||
2523 | } | ||
2524 | 2185 | ||
2525 | if (cp == NULL) | 2186 | /* make sure we don't track too many requests */ |
2526 | { | 2187 | if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) |
2527 | /* FIXME: bound queue size! */ | ||
2528 | reply->next = pr->replies_pending; | ||
2529 | pr->replies_pending = reply; | ||
2530 | if (pr->cth == NULL) | ||
2531 | { | ||
2532 | /* implicitly tries to connect */ | ||
2533 | pr->cth = GNUNET_CORE_notify_transmit_ready (core, | ||
2534 | prio, | ||
2535 | max_delay, | ||
2536 | &target, | ||
2537 | msize, | ||
2538 | &transmit_result, | ||
2539 | pr); | ||
2540 | } | ||
2541 | } | ||
2542 | else | ||
2543 | { | ||
2544 | /* insert replies always at the head */ | ||
2545 | reply->next = cp->pending_messages; | ||
2546 | cp->pending_messages = reply; | ||
2547 | if (cp->cth == NULL) | ||
2548 | cp->cth = GNUNET_CORE_notify_transmit_ready (core, | ||
2549 | reply->priority, | ||
2550 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
2551 | &target, | ||
2552 | msize, | ||
2553 | &transmit_request_cb, | ||
2554 | cp); | ||
2555 | } | ||
2556 | } | ||
2557 | else | ||
2558 | { | 2188 | { |
2559 | msize = sizeof (struct PutMessage) + prq->size; | 2189 | pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); |
2560 | reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); | 2190 | destroy_pending_request (pr); |
2561 | reply->msize = msize; | ||
2562 | reply->next = pr->replies_pending; | ||
2563 | pm = (struct PutMessage*) &reply[1]; | ||
2564 | pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); | ||
2565 | pm->header.size = htons (msize); | ||
2566 | pm->type = htonl (prq->type); | ||
2567 | pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration)); | ||
2568 | pr->replies_pending = reply; | ||
2569 | memcpy (&reply[1], prq->data, prq->size); | ||
2570 | if (pr->th != NULL) | ||
2571 | return GNUNET_YES; | ||
2572 | pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client, | ||
2573 | msize, | ||
2574 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
2575 | &transmit_result, | ||
2576 | pr); | ||
2577 | if (pr->th == NULL) | ||
2578 | { | ||
2579 | // FIXME: need to try again later (not much | ||
2580 | // to do here specifically, but we need to | ||
2581 | // check somewhere else to handle this case!) | ||
2582 | } | ||
2583 | } | 2191 | } |
2584 | // FIXME: implement hot-path routing statistics keeping! | ||
2585 | return GNUNET_YES; | ||
2586 | } | ||
2587 | |||
2588 | |||
2589 | /** | ||
2590 | * Check if the given KBlock is well-formed. | ||
2591 | * | ||
2592 | * @param kb the kblock data (or at least "dsize" bytes claiming to be one) | ||
2593 | * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)! | ||
2594 | * @param query where to store the query that this block answers | ||
2595 | * @return GNUNET_OK if this is actually a well-formed KBlock | ||
2596 | */ | ||
2597 | static int | ||
2598 | check_kblock (const struct KBlock *kb, | ||
2599 | size_t dsize, | ||
2600 | GNUNET_HashCode *query) | ||
2601 | { | ||
2602 | if (dsize < sizeof (struct KBlock)) | ||
2603 | { | ||
2604 | GNUNET_break_op (0); | ||
2605 | return GNUNET_SYSERR; | ||
2606 | } | ||
2607 | if (dsize - sizeof (struct KBlock) != | ||
2608 | ntohs (kb->purpose.size) | ||
2609 | - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) | ||
2610 | - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) | ||
2611 | { | ||
2612 | GNUNET_break_op (0); | ||
2613 | return GNUNET_SYSERR; | ||
2614 | } | ||
2615 | if (GNUNET_OK != | ||
2616 | GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK, | ||
2617 | &kb->purpose, | ||
2618 | &kb->signature, | ||
2619 | &kb->keyspace)) | ||
2620 | { | ||
2621 | GNUNET_break_op (0); | ||
2622 | return GNUNET_SYSERR; | ||
2623 | } | ||
2624 | if (query != NULL) | ||
2625 | GNUNET_CRYPTO_hash (&kb->keyspace, | ||
2626 | sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), | ||
2627 | query); | ||
2628 | return GNUNET_OK; | 2192 | return GNUNET_OK; |
2629 | } | 2193 | } |
2630 | 2194 | ||
2631 | 2195 | ||
2632 | /** | 2196 | /* **************************** CS GET Handling ************************ */ |
2633 | * Check if the given SBlock is well-formed. | ||
2634 | * | ||
2635 | * @param sb the sblock data (or at least "dsize" bytes claiming to be one) | ||
2636 | * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)! | ||
2637 | * @param query where to store the query that this block answers | ||
2638 | * @param namespace where to store the namespace that this block belongs to | ||
2639 | * @return GNUNET_OK if this is actually a well-formed SBlock | ||
2640 | */ | ||
2641 | static int | ||
2642 | check_sblock (const struct SBlock *sb, | ||
2643 | size_t dsize, | ||
2644 | GNUNET_HashCode *query, | ||
2645 | GNUNET_HashCode *namespace) | ||
2646 | { | ||
2647 | if (dsize < sizeof (struct SBlock)) | ||
2648 | { | ||
2649 | GNUNET_break_op (0); | ||
2650 | return GNUNET_SYSERR; | ||
2651 | } | ||
2652 | if (dsize != | ||
2653 | ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature)) | ||
2654 | { | ||
2655 | GNUNET_break_op (0); | ||
2656 | return GNUNET_SYSERR; | ||
2657 | } | ||
2658 | if (GNUNET_OK != | ||
2659 | GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK, | ||
2660 | &sb->purpose, | ||
2661 | &sb->signature, | ||
2662 | &sb->subspace)) | ||
2663 | { | ||
2664 | GNUNET_break_op (0); | ||
2665 | return GNUNET_SYSERR; | ||
2666 | } | ||
2667 | if (query != NULL) | ||
2668 | *query = sb->identifier; | ||
2669 | if (namespace != NULL) | ||
2670 | GNUNET_CRYPTO_hash (&sb->subspace, | ||
2671 | sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), | ||
2672 | namespace); | ||
2673 | return GNUNET_OK; | ||
2674 | } | ||
2675 | 2197 | ||
2676 | 2198 | ||
2677 | /** | 2199 | /** |
2678 | * Handle P2P "PUT" request. | 2200 | * Handle START_SEARCH-message (search request from client). |
2679 | * | 2201 | * |
2680 | * @param cls closure, always NULL | 2202 | * @param cls closure |
2681 | * @param other the other peer involved (sender or receiver, NULL | 2203 | * @param client identification of the client |
2682 | * for loopback messages where we are both sender and receiver) | ||
2683 | * @param message the actual message | 2204 | * @param message the actual message |
2684 | * @param latency reported latency of the connection with 'other' | ||
2685 | * @param distance reported distance (DV) to 'other' | ||
2686 | * @return GNUNET_OK to keep the connection open, | ||
2687 | * GNUNET_SYSERR to close it (signal serious error) | ||
2688 | */ | 2205 | */ |
2689 | static int | 2206 | static void |
2690 | handle_p2p_put (void *cls, | 2207 | handle_start_search (void *cls, |
2691 | const struct GNUNET_PeerIdentity *other, | 2208 | struct GNUNET_SERVER_Client *client, |
2692 | const struct GNUNET_MessageHeader *message, | 2209 | const struct GNUNET_MessageHeader *message) |
2693 | struct GNUNET_TIME_Relative latency, | ||
2694 | uint32_t distance) | ||
2695 | { | 2210 | { |
2696 | const struct PutMessage *put; | 2211 | static GNUNET_HashCode all_zeros; |
2212 | const struct SearchMessage *sm; | ||
2213 | struct ClientList *cl; | ||
2214 | struct ClientRequestList *crl; | ||
2215 | struct PendingRequest *pr; | ||
2697 | uint16_t msize; | 2216 | uint16_t msize; |
2698 | size_t dsize; | 2217 | unsigned int sc; |
2699 | uint32_t type; | 2218 | uint32_t type; |
2700 | struct GNUNET_TIME_Absolute expiration; | 2219 | |
2701 | GNUNET_HashCode query; | ||
2702 | struct ProcessReplyClosure prq; | ||
2703 | |||
2704 | msize = ntohs (message->size); | 2220 | msize = ntohs (message->size); |
2705 | if (msize < sizeof (struct PutMessage)) | 2221 | if ( (msize < sizeof (struct SearchMessage)) || |
2222 | (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) ) | ||
2706 | { | 2223 | { |
2707 | GNUNET_break_op(0); | 2224 | GNUNET_break (0); |
2708 | return GNUNET_SYSERR; | 2225 | GNUNET_SERVER_receive_done (client, |
2226 | GNUNET_SYSERR); | ||
2227 | return; | ||
2709 | } | 2228 | } |
2710 | put = (const struct PutMessage*) message; | 2229 | sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode); |
2711 | dsize = msize - sizeof (struct PutMessage); | 2230 | sm = (const struct SearchMessage*) message; |
2712 | type = ntohl (put->type); | ||
2713 | expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration)); | ||
2714 | 2231 | ||
2715 | /* first, validate! */ | 2232 | cl = client_list; |
2233 | while ( (cl != NULL) && | ||
2234 | (cl->client != client) ) | ||
2235 | cl = cl->next; | ||
2236 | if (cl == NULL) | ||
2237 | { | ||
2238 | cl = GNUNET_malloc (sizeof (struct ClientList)); | ||
2239 | cl->client = client; | ||
2240 | GNUNET_SERVER_client_keep (client); | ||
2241 | cl->next = client_list; | ||
2242 | client_list = cl; | ||
2243 | } | ||
2244 | type = ntohl (sm->type); | ||
2245 | |||
2246 | /* FIXME: detect duplicate request; if duplicate, simply update (merge) | ||
2247 | 'pr->replies_seen'! */ | ||
2248 | pr = GNUNET_malloc (sizeof (struct PendingRequest) + | ||
2249 | (type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)?sizeof(GNUNET_HashCode):0); | ||
2250 | crl = GNUNET_malloc (sizeof (struct ClientRequestList)); | ||
2251 | crl->client_list = cl; | ||
2252 | GNUNET_CONTAINER_DLL_insert (cl->rl_head, | ||
2253 | cl->rl_tail, | ||
2254 | crl); | ||
2255 | crl->req = pr; | ||
2256 | pr->type = type; | ||
2257 | pr->client_request_list = crl; | ||
2258 | GNUNET_array_grow (pr->replies_seen, | ||
2259 | pr->replies_seen_size, | ||
2260 | sc); | ||
2261 | memcpy (pr->replies_seen, | ||
2262 | &sm[1], | ||
2263 | sc * sizeof (GNUNET_HashCode)); | ||
2264 | pr->replies_seen_off = sc; | ||
2265 | pr->anonymity_level = ntohl (sm->anonymity_level); | ||
2266 | pr->mingle = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, | ||
2267 | (uint32_t) -1); | ||
2268 | pr->query = sm->query; | ||
2716 | switch (type) | 2269 | switch (type) |
2717 | { | 2270 | { |
2718 | case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: | 2271 | case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: |
2719 | case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: | 2272 | case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: |
2720 | GNUNET_CRYPTO_hash (&put[1], dsize, &query); | 2273 | if (0 != memcmp (&sm->target, |
2721 | break; | 2274 | &all_zeros, |
2722 | case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: | 2275 | sizeof (GNUNET_HashCode))) |
2723 | if (GNUNET_OK != | 2276 | pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target); |
2724 | check_kblock ((const struct KBlock*) &put[1], | ||
2725 | dsize, | ||
2726 | &query)) | ||
2727 | return GNUNET_SYSERR; | ||
2728 | break; | 2277 | break; |
2729 | case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: | 2278 | case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: |
2730 | if (GNUNET_OK != | 2279 | pr->namespace = (GNUNET_HashCode*) &pr[1]; |
2731 | check_sblock ((const struct SBlock*) &put[1], | 2280 | memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode)); |
2732 | dsize, | ||
2733 | &query, | ||
2734 | &prq.namespace)) | ||
2735 | return GNUNET_SYSERR; | ||
2736 | break; | 2281 | break; |
2737 | case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK: | ||
2738 | // FIXME -- validate SKBLOCK! | ||
2739 | GNUNET_break (0); | ||
2740 | return GNUNET_OK; | ||
2741 | default: | 2282 | default: |
2742 | /* unknown block type */ | 2283 | break; |
2743 | GNUNET_break_op (0); | ||
2744 | return GNUNET_SYSERR; | ||
2745 | } | 2284 | } |
2746 | 2285 | pr->drq = GNUNET_FS_drq_get (&sm->query, | |
2747 | /* now, lookup 'query' */ | 2286 | pr->type, |
2748 | prq.data = (const void*) &put[1]; | 2287 | &process_local_reply, |
2749 | prq.size = dsize; | 2288 | pr, |
2750 | prq.type = type; | 2289 | GNUNET_TIME_UNIT_FOREVER_REL); |
2751 | prq.expiration = expiration; | ||
2752 | prq.priority = 0; | ||
2753 | GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query, | ||
2754 | &query, | ||
2755 | &process_reply, | ||
2756 | &prq); | ||
2757 | // FIXME: if migration is on and load is low, | ||
2758 | // queue to store data in datastore; | ||
2759 | // use "prq.priority" for that! | ||
2760 | return GNUNET_OK; | ||
2761 | } | 2290 | } |
2762 | 2291 | ||
2763 | 2292 | ||
2293 | /* **************************** Startup ************************ */ | ||
2294 | |||
2295 | |||
2764 | /** | 2296 | /** |
2765 | * List of handlers for P2P messages | 2297 | * List of handlers for P2P messages |
2766 | * that we care about. | 2298 | * that we care about. |
@@ -2776,6 +2308,23 @@ static struct GNUNET_CORE_MessageHandler p2p_handlers[] = | |||
2776 | 2308 | ||
2777 | 2309 | ||
2778 | /** | 2310 | /** |
2311 | * List of handlers for the messages understood by this | ||
2312 | * service. | ||
2313 | */ | ||
2314 | static struct GNUNET_SERVER_MessageHandler handlers[] = { | ||
2315 | {&GNUNET_FS_handle_index_start, NULL, | ||
2316 | GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0}, | ||
2317 | {&GNUNET_FS_handle_index_list_get, NULL, | ||
2318 | GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) }, | ||
2319 | {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, | ||
2320 | sizeof (struct UnindexMessage) }, | ||
2321 | {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, | ||
2322 | 0 }, | ||
2323 | {NULL, NULL, 0, 0} | ||
2324 | }; | ||
2325 | |||
2326 | |||
2327 | /** | ||
2779 | * Process fs requests. | 2328 | * Process fs requests. |
2780 | * | 2329 | * |
2781 | * @param cls closure | 2330 | * @param cls closure |
@@ -2783,22 +2332,13 @@ static struct GNUNET_CORE_MessageHandler p2p_handlers[] = | |||
2783 | * @param server the initialized server | 2332 | * @param server the initialized server |
2784 | * @param c configuration to use | 2333 | * @param c configuration to use |
2785 | */ | 2334 | */ |
2786 | static void | 2335 | static int |
2787 | run (void *cls, | 2336 | main_init (struct GNUNET_SCHEDULER_Handle *s, |
2788 | struct GNUNET_SCHEDULER_Handle *s, | 2337 | struct GNUNET_SERVER_Handle *server, |
2789 | struct GNUNET_SERVER_Handle *server, | 2338 | const struct GNUNET_CONFIGURATION_Handle *c) |
2790 | const struct GNUNET_CONFIGURATION_Handle *c) | ||
2791 | { | 2339 | { |
2792 | sched = s; | 2340 | sched = s; |
2793 | cfg = c; | 2341 | cfg = c; |
2794 | |||
2795 | requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config | ||
2796 | requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config | ||
2797 | connected_peers = GNUNET_CONTAINER_multihashmap_create (64); | ||
2798 | requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | ||
2799 | GNUNET_FS_init_indexing (sched, cfg); | ||
2800 | dsh = GNUNET_DATASTORE_connect (cfg, | ||
2801 | sched); | ||
2802 | core = GNUNET_CORE_connect (sched, | 2342 | core = GNUNET_CORE_connect (sched, |
2803 | cfg, | 2343 | cfg, |
2804 | GNUNET_TIME_UNIT_FOREVER_REL, | 2344 | GNUNET_TIME_UNIT_FOREVER_REL, |
@@ -2810,7 +2350,16 @@ run (void *cls, | |||
2810 | NULL, GNUNET_NO, | 2350 | NULL, GNUNET_NO, |
2811 | NULL, GNUNET_NO, | 2351 | NULL, GNUNET_NO, |
2812 | p2p_handlers); | 2352 | p2p_handlers); |
2813 | 2353 | if (NULL == core) | |
2354 | { | ||
2355 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
2356 | _("Failed to connect to `%s' service.\n"), | ||
2357 | "core"); | ||
2358 | return GNUNET_SYSERR; | ||
2359 | } | ||
2360 | query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config | ||
2361 | peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config | ||
2362 | requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | ||
2814 | GNUNET_SERVER_disconnect_notify (server, | 2363 | GNUNET_SERVER_disconnect_notify (server, |
2815 | &handle_client_disconnect, | 2364 | &handle_client_disconnect, |
2816 | NULL); | 2365 | NULL); |
@@ -2819,21 +2368,30 @@ run (void *cls, | |||
2819 | GNUNET_TIME_UNIT_FOREVER_REL, | 2368 | GNUNET_TIME_UNIT_FOREVER_REL, |
2820 | &shutdown_task, | 2369 | &shutdown_task, |
2821 | NULL); | 2370 | NULL); |
2822 | if (NULL == dsh) | 2371 | return GNUNET_OK; |
2823 | { | 2372 | } |
2824 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 2373 | |
2825 | _("Failed to connect to `%s' service.\n"), | 2374 | |
2826 | "datastore"); | 2375 | /** |
2827 | GNUNET_SCHEDULER_shutdown (sched); | 2376 | * Process fs requests. |
2828 | return; | 2377 | * |
2829 | } | 2378 | * @param cls closure |
2830 | if (NULL == core) | 2379 | * @param sched scheduler to use |
2831 | { | 2380 | * @param server the initialized server |
2832 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 2381 | * @param cfg configuration to use |
2833 | _("Failed to connect to `%s' service.\n"), | 2382 | */ |
2834 | "core"); | 2383 | static void |
2384 | run (void *cls, | ||
2385 | struct GNUNET_SCHEDULER_Handle *sched, | ||
2386 | struct GNUNET_SERVER_Handle *server, | ||
2387 | const struct GNUNET_CONFIGURATION_Handle *cfg) | ||
2388 | { | ||
2389 | if ( (GNUNET_OK != GNUNET_FS_drq_init (sched, cfg)) || | ||
2390 | (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg)) || | ||
2391 | (GNUNET_OK != main_init (sched, server, cfg)) ) | ||
2392 | { | ||
2835 | GNUNET_SCHEDULER_shutdown (sched); | 2393 | GNUNET_SCHEDULER_shutdown (sched); |
2836 | return; | 2394 | return; |
2837 | } | 2395 | } |
2838 | } | 2396 | } |
2839 | 2397 | ||
diff --git a/src/fs/gnunet-service-fs_drq.c b/src/fs/gnunet-service-fs_drq.c new file mode 100644 index 000000000..c15e37a0d --- /dev/null +++ b/src/fs/gnunet-service-fs_drq.c | |||
@@ -0,0 +1,416 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) 2009, 2010 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file fs/gnunet-service-fs_drq.c | ||
23 | * @brief queueing of requests to the datastore service | ||
24 | * @author Christian Grothoff | ||
25 | */ | ||
26 | #include "platform.h" | ||
27 | #include "gnunet-service-fs_drq.h" | ||
28 | |||
29 | |||
30 | /** | ||
31 | * Signature of a function that is called whenever a datastore | ||
32 | * request can be processed (or an entry put on the queue times out). | ||
33 | * | ||
34 | * @param cls closure | ||
35 | * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout | ||
36 | */ | ||
37 | typedef void (*RequestFunction)(void *cls, | ||
38 | int ok); | ||
39 | |||
40 | |||
41 | /** | ||
42 | * Doubly-linked list of our requests for the datastore. | ||
43 | */ | ||
44 | struct DatastoreRequestQueue | ||
45 | { | ||
46 | |||
47 | /** | ||
48 | * This is a doubly-linked list. | ||
49 | */ | ||
50 | struct DatastoreRequestQueue *next; | ||
51 | |||
52 | /** | ||
53 | * This is a doubly-linked list. | ||
54 | */ | ||
55 | struct DatastoreRequestQueue *prev; | ||
56 | |||
57 | /** | ||
58 | * Function to call (will issue the request). | ||
59 | */ | ||
60 | RequestFunction req; | ||
61 | |||
62 | /** | ||
63 | * Closure for req. | ||
64 | */ | ||
65 | void *req_cls; | ||
66 | |||
67 | /** | ||
68 | * When should this request time-out because we don't care anymore? | ||
69 | */ | ||
70 | struct GNUNET_TIME_Absolute timeout; | ||
71 | |||
72 | /** | ||
73 | * ID of task used for signaling timeout. | ||
74 | */ | ||
75 | GNUNET_SCHEDULER_TaskIdentifier task; | ||
76 | |||
77 | }; | ||
78 | |||
79 | /** | ||
80 | * Our scheduler. | ||
81 | */ | ||
82 | static struct GNUNET_SCHEDULER_Handle *sched; | ||
83 | |||
84 | /** | ||
85 | * Our configuration. | ||
86 | */ | ||
87 | const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
88 | |||
89 | /** | ||
90 | * Head of request queue for the datastore, sorted by timeout. | ||
91 | */ | ||
92 | static struct DatastoreRequestQueue *drq_head; | ||
93 | |||
94 | /** | ||
95 | * Tail of request queue for the datastore. | ||
96 | */ | ||
97 | static struct DatastoreRequestQueue *drq_tail; | ||
98 | |||
99 | /** | ||
100 | * Our connection to the datastore. | ||
101 | */ | ||
102 | static struct GNUNET_DATASTORE_Handle *dsh; | ||
103 | |||
104 | |||
105 | /** | ||
106 | * Run the next DS request in our | ||
107 | * queue, we're done with the current one. | ||
108 | */ | ||
109 | static void | ||
110 | next_ds_request () | ||
111 | { | ||
112 | struct DatastoreRequestQueue *e; | ||
113 | |||
114 | while (NULL != (e = drq_head)) | ||
115 | { | ||
116 | if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value) | ||
117 | break; | ||
118 | if (e->task != GNUNET_SCHEDULER_NO_TASK) | ||
119 | GNUNET_SCHEDULER_cancel (sched, e->task); | ||
120 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); | ||
121 | e->req (e->req_cls, GNUNET_NO); | ||
122 | GNUNET_free (e); | ||
123 | } | ||
124 | if (e == NULL) | ||
125 | return; | ||
126 | if (e->task != GNUNET_SCHEDULER_NO_TASK) | ||
127 | GNUNET_SCHEDULER_cancel (sched, e->task); | ||
128 | e->task = GNUNET_SCHEDULER_NO_TASK; | ||
129 | e->req (e->req_cls, GNUNET_YES); | ||
130 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); | ||
131 | GNUNET_free (e); | ||
132 | } | ||
133 | |||
134 | |||
135 | /** | ||
136 | * A datastore request had to be timed out. | ||
137 | * | ||
138 | * @param cls closure (of type "struct DatastoreRequestQueue*") | ||
139 | * @param tc task context, unused | ||
140 | */ | ||
141 | static void | ||
142 | timeout_ds_request (void *cls, | ||
143 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
144 | { | ||
145 | struct DatastoreRequestQueue *e = cls; | ||
146 | |||
147 | e->task = GNUNET_SCHEDULER_NO_TASK; | ||
148 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); | ||
149 | e->req (e->req_cls, GNUNET_NO); | ||
150 | GNUNET_free (e); | ||
151 | } | ||
152 | |||
153 | |||
154 | static void | ||
155 | dequeue_ds_request (struct DatastoreRequestQueue *req) | ||
156 | { | ||
157 | if (req->task != GNUNET_SCHEDULER_NO_TASK) | ||
158 | GNUNET_SCHEDULER_cancel (sched, req->task); | ||
159 | GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, req); | ||
160 | GNUNET_free (req); | ||
161 | } | ||
162 | |||
163 | |||
164 | /** | ||
165 | * Queue a request for the datastore. | ||
166 | * | ||
167 | * @param deadline by when the request should run | ||
168 | * @param fun function to call once the request can be run | ||
169 | * @param fun_cls closure for fun | ||
170 | * @return handle that can be used to dequeue the request | ||
171 | */ | ||
172 | static struct DatastoreRequestQueue * | ||
173 | queue_ds_request (struct GNUNET_TIME_Relative deadline, | ||
174 | RequestFunction fun, | ||
175 | void *fun_cls) | ||
176 | { | ||
177 | struct DatastoreRequestQueue *e; | ||
178 | struct DatastoreRequestQueue *bef; | ||
179 | |||
180 | if (drq_head == NULL) | ||
181 | { | ||
182 | /* no other requests pending, run immediately */ | ||
183 | // FIXME: should probably use scheduler nevertheless | ||
184 | // and return non-null! | ||
185 | fun (fun_cls, GNUNET_OK); | ||
186 | return NULL; | ||
187 | } | ||
188 | e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue)); | ||
189 | e->timeout = GNUNET_TIME_relative_to_absolute (deadline); | ||
190 | e->req = fun; | ||
191 | e->req_cls = fun_cls; | ||
192 | if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) | ||
193 | { | ||
194 | /* local request, highest prio, put at head of queue | ||
195 | regardless of deadline */ | ||
196 | bef = NULL; | ||
197 | } | ||
198 | else | ||
199 | { | ||
200 | bef = drq_tail; | ||
201 | while ( (NULL != bef) && | ||
202 | (e->timeout.value < bef->timeout.value) ) | ||
203 | bef = bef->prev; | ||
204 | } | ||
205 | GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e); | ||
206 | if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) | ||
207 | return e; | ||
208 | e->task = GNUNET_SCHEDULER_add_delayed (sched, | ||
209 | deadline, | ||
210 | &timeout_ds_request, | ||
211 | e); | ||
212 | return e; | ||
213 | } | ||
214 | |||
215 | |||
216 | /** | ||
217 | * Task run during shutdown. | ||
218 | * | ||
219 | * @param cls unused | ||
220 | * @param tc unused | ||
221 | */ | ||
222 | static void | ||
223 | shutdown_task (void *cls, | ||
224 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
225 | { | ||
226 | struct DatastoreRequestQueue *drq; | ||
227 | |||
228 | GNUNET_assert (NULL != dsh); | ||
229 | GNUNET_DATASTORE_disconnect (dsh, | ||
230 | GNUNET_NO); | ||
231 | dsh = NULL; | ||
232 | while (NULL != (drq = drq_head)) | ||
233 | { | ||
234 | drq_head = drq->next; | ||
235 | drq->req (drq->req_cls, GNUNET_NO); | ||
236 | dequeue_ds_request (drq); | ||
237 | } | ||
238 | drq_tail = NULL; | ||
239 | } | ||
240 | |||
241 | |||
242 | struct GetClosure | ||
243 | { | ||
244 | GNUNET_HashCode key; | ||
245 | uint32_t type; | ||
246 | GNUNET_DATASTORE_Iterator iter; | ||
247 | void *iter_cls; | ||
248 | struct GNUNET_TIME_Absolute timeout; | ||
249 | }; | ||
250 | |||
251 | |||
252 | static void | ||
253 | get_iterator (void *cls, | ||
254 | const GNUNET_HashCode * key, | ||
255 | uint32_t size, | ||
256 | const void *data, | ||
257 | uint32_t type, | ||
258 | uint32_t priority, | ||
259 | uint32_t anonymity, | ||
260 | struct GNUNET_TIME_Absolute | ||
261 | expiration, | ||
262 | uint64_t uid) | ||
263 | { | ||
264 | struct GetClosure *gc = cls; | ||
265 | |||
266 | gc->iter (gc->iter_cls, | ||
267 | key, size, data, type, | ||
268 | priority, anonymity, expiration, uid); | ||
269 | if (key == NULL) | ||
270 | { | ||
271 | next_ds_request (); | ||
272 | GNUNET_free (gc); | ||
273 | } | ||
274 | } | ||
275 | |||
276 | |||
277 | static void | ||
278 | do_get (void *cls, | ||
279 | int ok) | ||
280 | { | ||
281 | struct GetClosure *gc = cls; | ||
282 | |||
283 | if (ok != GNUNET_OK) | ||
284 | { | ||
285 | gc->iter (gc->iter_cls, | ||
286 | NULL, 0, NULL, 0, 0, 0, | ||
287 | GNUNET_TIME_UNIT_ZERO_ABS, 0); | ||
288 | GNUNET_free (gc); | ||
289 | next_ds_request (); | ||
290 | return; | ||
291 | } | ||
292 | GNUNET_DATASTORE_get (dsh, &gc->key, gc->type, | ||
293 | &get_iterator, | ||
294 | gc, | ||
295 | GNUNET_TIME_absolute_get_remaining(gc->timeout)); | ||
296 | } | ||
297 | |||
298 | |||
299 | /** | ||
300 | * Iterate over the results for a particular key | ||
301 | * in the datastore. The iterator will only be called | ||
302 | * once initially; if the first call did contain a | ||
303 | * result, further results can be obtained by calling | ||
304 | * "GNUNET_DATASTORE_get_next" with the given argument. | ||
305 | * | ||
306 | * @param key maybe NULL (to match all entries) | ||
307 | * @param type desired type, 0 for any | ||
308 | * @param iter function to call on each matching value; | ||
309 | * will be called once with a NULL value at the end | ||
310 | * @param iter_cls closure for iter | ||
311 | * @param timeout how long to wait at most for a response | ||
312 | */ | ||
313 | struct DatastoreRequestQueue * | ||
314 | GNUNET_FS_drq_get (const GNUNET_HashCode * key, | ||
315 | uint32_t type, | ||
316 | GNUNET_DATASTORE_Iterator iter, | ||
317 | void *iter_cls, | ||
318 | struct GNUNET_TIME_Relative timeout) | ||
319 | { | ||
320 | struct GetClosure *gc; | ||
321 | |||
322 | gc = GNUNET_malloc (sizeof (struct GetClosure)); | ||
323 | gc->key = *key; | ||
324 | gc->type = type; | ||
325 | gc->iter = iter; | ||
326 | gc->iter_cls = iter_cls; | ||
327 | gc->timeout = GNUNET_TIME_relative_to_absolute (timeout); | ||
328 | return queue_ds_request (timeout, | ||
329 | &do_get, | ||
330 | gc); | ||
331 | } | ||
332 | |||
333 | |||
334 | void | ||
335 | GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq) | ||
336 | { | ||
337 | dequeue_ds_request (drq); | ||
338 | } | ||
339 | |||
340 | |||
341 | /** | ||
342 | * Function called to trigger obtaining the next result | ||
343 | * from the datastore. | ||
344 | * | ||
345 | * @param more GNUNET_YES to get more results, GNUNET_NO to abort | ||
346 | * iteration (with a final call to "iter" with key/data == NULL). | ||
347 | */ | ||
348 | void | ||
349 | GNUNET_FS_drq_get_next (int more) | ||
350 | { | ||
351 | GNUNET_DATASTORE_get_next (dsh, more); | ||
352 | } | ||
353 | |||
354 | |||
355 | /** | ||
356 | * Explicitly remove some content from the database. | ||
357 | * The "cont"inuation will be called with status | ||
358 | * "GNUNET_OK" if content was removed, "GNUNET_NO" | ||
359 | * if no matching entry was found and "GNUNET_SYSERR" | ||
360 | * on all other types of errors. | ||
361 | * | ||
362 | * @param key key for the value | ||
363 | * @param size number of bytes in data | ||
364 | * @param data content stored | ||
365 | * @param cont continuation to call when done | ||
366 | * @param cont_cls closure for cont | ||
367 | * @param timeout how long to wait at most for a response | ||
368 | */ | ||
369 | void | ||
370 | GNUNET_FS_drq_remove (const GNUNET_HashCode *key, | ||
371 | uint32_t size, const void *data, | ||
372 | GNUNET_DATASTORE_ContinuationWithStatus cont, | ||
373 | void *cont_cls, | ||
374 | struct GNUNET_TIME_Relative timeout) | ||
375 | { | ||
376 | if (dsh == NULL) | ||
377 | { | ||
378 | GNUNET_break (0); | ||
379 | return; | ||
380 | } | ||
381 | GNUNET_DATASTORE_remove (dsh, key, size, data, | ||
382 | cont, cont_cls, timeout); | ||
383 | } | ||
384 | |||
385 | |||
386 | /** | ||
387 | * Setup datastore request queues. | ||
388 | * | ||
389 | * @param s scheduler to use | ||
390 | * @param c configuration to use | ||
391 | * @return GNUNET_OK on success | ||
392 | */ | ||
393 | int | ||
394 | GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s, | ||
395 | const struct GNUNET_CONFIGURATION_Handle *c) | ||
396 | { | ||
397 | sched = s; | ||
398 | cfg = c; | ||
399 | dsh = GNUNET_DATASTORE_connect (cfg, | ||
400 | sched); | ||
401 | if (NULL == dsh) | ||
402 | { | ||
403 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
404 | _("Failed to connect to `%s' service.\n"), | ||
405 | "datastore"); | ||
406 | return GNUNET_SYSERR; | ||
407 | } | ||
408 | GNUNET_SCHEDULER_add_delayed (sched, | ||
409 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
410 | &shutdown_task, | ||
411 | NULL); | ||
412 | return GNUNET_OK; | ||
413 | } | ||
414 | |||
415 | |||
416 | /* end of gnunet-service-fs_drq.c */ | ||
diff --git a/src/fs/gnunet-service-fs_drq.h b/src/fs/gnunet-service-fs_drq.h new file mode 100644 index 000000000..306db9bad --- /dev/null +++ b/src/fs/gnunet-service-fs_drq.h | |||
@@ -0,0 +1,137 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) 2009, 2010 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file fs/gnunet-service-fs_drq.h | ||
23 | * @brief queueing of requests to the datastore service | ||
24 | * @author Christian Grothoff | ||
25 | */ | ||
26 | #ifndef GNUNET_SERVICE_FS_DRQ_H | ||
27 | #define GNUNET_SERVICE_FS_DRQ_H | ||
28 | |||
29 | #include "gnunet_datastore_service.h" | ||
30 | #include "gnunet_util_lib.h" | ||
31 | |||
32 | |||
33 | /** | ||
34 | * Handle for pending, abortable requests for the datastore. | ||
35 | */ | ||
36 | struct DatastoreRequestQueue; | ||
37 | |||
38 | |||
39 | /** | ||
40 | * Iterate over the results for a particular key | ||
41 | * in the datastore. The iterator will only be called | ||
42 | * once initially; if the first call did contain a | ||
43 | * result, further results can be obtained by calling | ||
44 | * "GNUNET_DATASTORE_get_next" with the given argument. | ||
45 | * | ||
46 | * @param key maybe NULL (to match all entries) | ||
47 | * @param type desired type, 0 for any | ||
48 | * @param iter function to call on each matching value; | ||
49 | * will be called once with a NULL value at the end | ||
50 | * @param iter_cls closure for iter | ||
51 | * @param timeout how long to wait at most for a response | ||
52 | */ | ||
53 | struct DatastoreRequestQueue * | ||
54 | GNUNET_FS_drq_get (const GNUNET_HashCode * key, | ||
55 | uint32_t type, | ||
56 | GNUNET_DATASTORE_Iterator iter, | ||
57 | void *iter_cls, | ||
58 | struct GNUNET_TIME_Relative timeout); | ||
59 | |||
60 | |||
61 | |||
62 | void | ||
63 | GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq); | ||
64 | |||
65 | |||
66 | /** | ||
67 | * Function called to trigger obtaining the next result | ||
68 | * from the datastore. Must be called (directly or indirectly) | ||
69 | * from the 'iter' callback given to 'GNUNET_FS_drq_get'. | ||
70 | * Not calling 'get_next' means no other datastore | ||
71 | * interactions (other than remove) will happen. | ||
72 | * | ||
73 | * @param more GNUNET_YES to get more results, GNUNET_NO to abort | ||
74 | * iteration (with a final call to "iter" with key/data == NULL). | ||
75 | */ | ||
76 | void | ||
77 | GNUNET_FS_drq_get_next (int more); | ||
78 | |||
79 | |||
80 | /** | ||
81 | * Explicitly remove some content from the database. | ||
82 | * The "cont"inuation will be called with status | ||
83 | * "GNUNET_OK" if content was removed, "GNUNET_NO" | ||
84 | * if no matching entry was found and "GNUNET_SYSERR" | ||
85 | * on all other types of errors. | ||
86 | * | ||
87 | * @param key key for the value | ||
88 | * @param size number of bytes in data | ||
89 | * @param data content stored | ||
90 | * @param cont continuation to call when done | ||
91 | * @param cont_cls closure for cont | ||
92 | * @param timeout how long to wait at most for a response | ||
93 | */ | ||
94 | void | ||
95 | GNUNET_FS_drq_remove (const GNUNET_HashCode *key, | ||
96 | uint32_t size, const void *data, | ||
97 | GNUNET_DATASTORE_ContinuationWithStatus cont, | ||
98 | void *cont_cls, | ||
99 | struct GNUNET_TIME_Relative timeout); | ||
100 | |||
101 | |||
102 | |||
103 | /** | ||
104 | * Explicitly remove some content from the database. | ||
105 | * The "cont"inuation will be called with status | ||
106 | * "GNUNET_OK" if content was removed, "GNUNET_NO" | ||
107 | * if no matching entry was found and "GNUNET_SYSERR" | ||
108 | * on all other types of errors. | ||
109 | * | ||
110 | * @param key key for the value | ||
111 | * @param size number of bytes in data | ||
112 | * @param data content stored | ||
113 | * @param cont continuation to call when done | ||
114 | * @param cont_cls closure for cont | ||
115 | * @param timeout how long to wait at most for a response | ||
116 | */ | ||
117 | void | ||
118 | GNUNET_FS_drq_remove (const GNUNET_HashCode *key, | ||
119 | uint32_t size, const void *data, | ||
120 | GNUNET_DATASTORE_ContinuationWithStatus cont, | ||
121 | void *cont_cls, | ||
122 | struct GNUNET_TIME_Relative timeout); | ||
123 | /** | ||
124 | * Setup datastore request queues. | ||
125 | * | ||
126 | * @param s scheduler to use | ||
127 | * @param c configuration to use | ||
128 | * @return GNUNET_OK on success | ||
129 | */ | ||
130 | int | ||
131 | GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s, | ||
132 | const struct GNUNET_CONFIGURATION_Handle *c); | ||
133 | |||
134 | |||
135 | |||
136 | /* end of gnunet-service-fs_drq.h */ | ||
137 | #endif | ||
diff --git a/src/fs/gnunet-service-fs_indexing.c b/src/fs/gnunet-service-fs_indexing.c index ebd7114d3..7dea53ee9 100644 --- a/src/fs/gnunet-service-fs_indexing.c +++ b/src/fs/gnunet-service-fs_indexing.c | |||
@@ -34,6 +34,7 @@ | |||
34 | #include "gnunet_protocols.h" | 34 | #include "gnunet_protocols.h" |
35 | #include "gnunet_signatures.h" | 35 | #include "gnunet_signatures.h" |
36 | #include "gnunet_util_lib.h" | 36 | #include "gnunet_util_lib.h" |
37 | #include "gnunet-service-fs_drq.h" | ||
37 | #include "gnunet-service-fs_indexing.h" | 38 | #include "gnunet-service-fs_indexing.h" |
38 | #include "fs.h" | 39 | #include "fs.h" |
39 | 40 | ||
@@ -508,13 +509,10 @@ remove_cont (void *cls, | |||
508 | int success, | 509 | int success, |
509 | const char *msg) | 510 | const char *msg) |
510 | { | 511 | { |
511 | struct GNUNET_DATASTORE_Handle *dsh = cls; | ||
512 | |||
513 | if (GNUNET_OK != success) | 512 | if (GNUNET_OK != success) |
514 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 513 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
515 | _("Failed to delete bogus block: %s\n"), | 514 | _("Failed to delete bogus block: %s\n"), |
516 | msg); | 515 | msg); |
517 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | ||
518 | } | 516 | } |
519 | 517 | ||
520 | 518 | ||
@@ -533,10 +531,11 @@ remove_cont (void *cls, | |||
533 | * @param expiration expiration time for the content | 531 | * @param expiration expiration time for the content |
534 | * @param uid unique identifier for the datum; | 532 | * @param uid unique identifier for the datum; |
535 | * maybe 0 if no unique identifier is available | 533 | * maybe 0 if no unique identifier is available |
536 | * @param cont function to call with the actual block | 534 | * @param cont function to call with the actual block (at most once, on success) |
537 | * @param cont_cls closure for cont | 535 | * @param cont_cls closure for cont |
536 | * @return GNUNET_OK on success | ||
538 | */ | 537 | */ |
539 | void | 538 | int |
540 | GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, | 539 | GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, |
541 | uint32_t size, | 540 | uint32_t size, |
542 | const void *data, | 541 | const void *data, |
@@ -545,7 +544,6 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, | |||
545 | uint32_t anonymity, | 544 | uint32_t anonymity, |
546 | struct GNUNET_TIME_Absolute | 545 | struct GNUNET_TIME_Absolute |
547 | expiration, uint64_t uid, | 546 | expiration, uint64_t uid, |
548 | struct GNUNET_DATASTORE_Handle *dsh, | ||
549 | GNUNET_DATASTORE_Iterator cont, | 547 | GNUNET_DATASTORE_Iterator cont, |
550 | void *cont_cls) | 548 | void *cont_cls) |
551 | { | 549 | { |
@@ -564,14 +562,13 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, | |||
564 | if (size != sizeof (struct OnDemandBlock)) | 562 | if (size != sizeof (struct OnDemandBlock)) |
565 | { | 563 | { |
566 | GNUNET_break (0); | 564 | GNUNET_break (0); |
567 | GNUNET_DATASTORE_remove (dsh, | 565 | GNUNET_FS_drq_remove (key, |
568 | key, | 566 | size, |
569 | size, | 567 | data, |
570 | data, | 568 | &remove_cont, |
571 | &remove_cont, | 569 | NULL, |
572 | dsh, | 570 | GNUNET_TIME_UNIT_FOREVER_REL); |
573 | GNUNET_TIME_UNIT_FOREVER_REL); | 571 | return GNUNET_SYSERR; |
574 | return; | ||
575 | } | 572 | } |
576 | odb = (const struct OnDemandBlock*) data; | 573 | odb = (const struct OnDemandBlock*) data; |
577 | off = GNUNET_ntohll (odb->offset); | 574 | off = GNUNET_ntohll (odb->offset); |
@@ -600,8 +597,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, | |||
600 | GNUNET_DISK_file_close (fh); | 597 | GNUNET_DISK_file_close (fh); |
601 | /* FIXME: if this happens often, we need | 598 | /* FIXME: if this happens often, we need |
602 | to remove the OnDemand block from the DS! */ | 599 | to remove the OnDemand block from the DS! */ |
603 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | 600 | return GNUNET_SYSERR; |
604 | return; | ||
605 | } | 601 | } |
606 | GNUNET_DISK_file_close (fh); | 602 | GNUNET_DISK_file_close (fh); |
607 | GNUNET_CRYPTO_hash (ndata, | 603 | GNUNET_CRYPTO_hash (ndata, |
@@ -626,8 +622,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, | |||
626 | (unsigned long long) off); | 622 | (unsigned long long) off); |
627 | /* FIXME: if this happens often, we need | 623 | /* FIXME: if this happens often, we need |
628 | to remove the OnDemand block from the DS! */ | 624 | to remove the OnDemand block from the DS! */ |
629 | GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); | 625 | return GNUNET_SYSERR; |
630 | return; | ||
631 | } | 626 | } |
632 | cont (cont_cls, | 627 | cont (cont_cls, |
633 | key, | 628 | key, |
@@ -638,6 +633,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, | |||
638 | anonymity, | 633 | anonymity, |
639 | expiration, | 634 | expiration, |
640 | uid); | 635 | uid); |
636 | return GNUNET_OK; | ||
641 | } | 637 | } |
642 | 638 | ||
643 | 639 | ||
@@ -671,8 +667,8 @@ shutdown_task (void *cls, | |||
671 | * @param s scheduler to use | 667 | * @param s scheduler to use |
672 | * @param c configuration to use | 668 | * @param c configuration to use |
673 | */ | 669 | */ |
674 | void | 670 | int |
675 | GNUNET_FS_init_indexing (struct GNUNET_SCHEDULER_Handle *s, | 671 | GNUNET_FS_indexing_init (struct GNUNET_SCHEDULER_Handle *s, |
676 | const struct GNUNET_CONFIGURATION_Handle *c) | 672 | const struct GNUNET_CONFIGURATION_Handle *c) |
677 | { | 673 | { |
678 | sched = s; | 674 | sched = s; |
@@ -683,6 +679,7 @@ GNUNET_FS_init_indexing (struct GNUNET_SCHEDULER_Handle *s, | |||
683 | &shutdown_task, | 679 | &shutdown_task, |
684 | NULL); | 680 | NULL); |
685 | read_index_list (); | 681 | read_index_list (); |
682 | return GNUNET_OK; | ||
686 | } | 683 | } |
687 | 684 | ||
688 | /* end of gnunet-service-fs_indexing.c */ | 685 | /* end of gnunet-service-fs_indexing.c */ |
diff --git a/src/fs/gnunet-service-fs_indexing.h b/src/fs/gnunet-service-fs_indexing.h index dc6427234..9749b42a0 100644 --- a/src/fs/gnunet-service-fs_indexing.h +++ b/src/fs/gnunet-service-fs_indexing.h | |||
@@ -49,11 +49,11 @@ | |||
49 | * @param expiration expiration time for the content | 49 | * @param expiration expiration time for the content |
50 | * @param uid unique identifier for the datum; | 50 | * @param uid unique identifier for the datum; |
51 | * maybe 0 if no unique identifier is available | 51 | * maybe 0 if no unique identifier is available |
52 | * @param dsh connection to the datastore (to ask for more) | 52 | * @param cont function to call with the actual block (at most once, on success) |
53 | * @param cont function to call with the actual block | ||
54 | * @param cont_cls closure for cont | 53 | * @param cont_cls closure for cont |
54 | * @return GNUNET_OK on success | ||
55 | */ | 55 | */ |
56 | void | 56 | int |
57 | GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, | 57 | GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, |
58 | uint32_t size, | 58 | uint32_t size, |
59 | const void *data, | 59 | const void *data, |
@@ -62,7 +62,6 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, | |||
62 | uint32_t anonymity, | 62 | uint32_t anonymity, |
63 | struct GNUNET_TIME_Absolute | 63 | struct GNUNET_TIME_Absolute |
64 | expiration, uint64_t uid, | 64 | expiration, uint64_t uid, |
65 | struct GNUNET_DATASTORE_Handle *dsh, | ||
66 | GNUNET_DATASTORE_Iterator cont, | 65 | GNUNET_DATASTORE_Iterator cont, |
67 | void *cont_cls); | 66 | void *cont_cls); |
68 | 67 | ||
@@ -112,9 +111,10 @@ GNUNET_FS_handle_unindex (void *cls, | |||
112 | * | 111 | * |
113 | * @param s scheduler to use | 112 | * @param s scheduler to use |
114 | * @param c configuration to use | 113 | * @param c configuration to use |
114 | * @return GNUNET_OK on success | ||
115 | */ | 115 | */ |
116 | void | 116 | int |
117 | GNUNET_FS_init_indexing (struct GNUNET_SCHEDULER_Handle *s, | 117 | GNUNET_FS_indexing_init (struct GNUNET_SCHEDULER_Handle *s, |
118 | const struct GNUNET_CONFIGURATION_Handle *c); | 118 | const struct GNUNET_CONFIGURATION_Handle *c); |
119 | 119 | ||
120 | 120 | ||