diff options
Diffstat (limited to 'src/dht/gnunet-service-wdht_clients.c')
-rw-r--r-- | src/dht/gnunet-service-wdht_clients.c | 1449 |
1 files changed, 1449 insertions, 0 deletions
diff --git a/src/dht/gnunet-service-wdht_clients.c b/src/dht/gnunet-service-wdht_clients.c new file mode 100644 index 000000000..2e4d4ef0a --- /dev/null +++ b/src/dht/gnunet-service-wdht_clients.c | |||
@@ -0,0 +1,1449 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file dht/gnunet-service-dht_clients.c | ||
23 | * @brief GNUnet DHT service's client management code | ||
24 | * @author Christian Grothoff | ||
25 | * @author Nathan Evans | ||
26 | */ | ||
27 | |||
28 | #include "platform.h" | ||
29 | #include "gnunet_constants.h" | ||
30 | #include "gnunet_protocols.h" | ||
31 | #include "gnunet_statistics_service.h" | ||
32 | #include "gnunet-service-wdht.h" | ||
33 | #include "gnunet-service-wdht_clients.h" | ||
34 | #include "gnunet-service-wdht_datacache.h" | ||
35 | #include "gnunet-service-wdht_neighbours.h" | ||
36 | #include "dht.h" | ||
37 | |||
38 | |||
39 | /** | ||
40 | * Should routing details be logged to stderr (for debugging)? | ||
41 | */ | ||
42 | #define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__) | ||
43 | |||
44 | #define LOG(kind,...) GNUNET_log_from (kind, "dht-clients",__VA_ARGS__) | ||
45 | |||
46 | #define DEBUG(...) \ | ||
47 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__) | ||
48 | |||
49 | /** | ||
50 | * Linked list of messages to send to clients. | ||
51 | */ | ||
52 | struct PendingMessage | ||
53 | { | ||
54 | /** | ||
55 | * Pointer to next item in the list | ||
56 | */ | ||
57 | struct PendingMessage *next; | ||
58 | |||
59 | /** | ||
60 | * Pointer to previous item in the list | ||
61 | */ | ||
62 | struct PendingMessage *prev; | ||
63 | |||
64 | /** | ||
65 | * Actual message to be sent, allocated at the end of the struct: | ||
66 | * // msg = (cast) &pm[1]; | ||
67 | * // memcpy (&pm[1], data, len); | ||
68 | */ | ||
69 | const struct GNUNET_MessageHeader *msg; | ||
70 | |||
71 | }; | ||
72 | |||
73 | |||
74 | /** | ||
75 | * Struct containing information about a client, | ||
76 | * handle to connect to it, and any pending messages | ||
77 | * that need to be sent to it. | ||
78 | */ | ||
79 | struct ClientList | ||
80 | { | ||
81 | /** | ||
82 | * Linked list of active clients | ||
83 | */ | ||
84 | struct ClientList *next; | ||
85 | |||
86 | /** | ||
87 | * Linked list of active clients | ||
88 | */ | ||
89 | struct ClientList *prev; | ||
90 | |||
91 | /** | ||
92 | * The handle to this client | ||
93 | */ | ||
94 | struct GNUNET_SERVER_Client *client_handle; | ||
95 | |||
96 | /** | ||
97 | * Handle to the current transmission request, NULL | ||
98 | * if none pending. | ||
99 | */ | ||
100 | struct GNUNET_SERVER_TransmitHandle *transmit_handle; | ||
101 | |||
102 | /** | ||
103 | * Linked list of pending messages for this client | ||
104 | */ | ||
105 | struct PendingMessage *pending_head; | ||
106 | |||
107 | /** | ||
108 | * Tail of linked list of pending messages for this client | ||
109 | */ | ||
110 | struct PendingMessage *pending_tail; | ||
111 | |||
112 | }; | ||
113 | |||
114 | |||
115 | /** | ||
116 | * Entry in the local forwarding map for a client's GET request. | ||
117 | */ | ||
118 | struct ClientQueryRecord | ||
119 | { | ||
120 | |||
121 | /** | ||
122 | * The key this request was about | ||
123 | */ | ||
124 | struct GNUNET_HashCode key; | ||
125 | |||
126 | /** | ||
127 | * Client responsible for the request. | ||
128 | */ | ||
129 | struct ClientList *client; | ||
130 | |||
131 | /** | ||
132 | * Extended query (see gnunet_block_lib.h), allocated at the end of this struct. | ||
133 | */ | ||
134 | const void *xquery; | ||
135 | |||
136 | /** | ||
137 | * Replies we have already seen for this request. | ||
138 | */ | ||
139 | struct GNUNET_HashCode *seen_replies; | ||
140 | |||
141 | /** | ||
142 | * Pointer to this nodes heap location in the retry-heap (for fast removal) | ||
143 | */ | ||
144 | struct GNUNET_CONTAINER_HeapNode *hnode; | ||
145 | |||
146 | /** | ||
147 | * What's the delay between re-try operations that we currently use for this | ||
148 | * request? | ||
149 | */ | ||
150 | struct GNUNET_TIME_Relative retry_frequency; | ||
151 | |||
152 | /** | ||
153 | * What's the next time we should re-try this request? | ||
154 | */ | ||
155 | struct GNUNET_TIME_Absolute retry_time; | ||
156 | |||
157 | /** | ||
158 | * The unique identifier of this request | ||
159 | */ | ||
160 | uint64_t unique_id; | ||
161 | |||
162 | /** | ||
163 | * Number of bytes in xquery. | ||
164 | */ | ||
165 | size_t xquery_size; | ||
166 | |||
167 | /** | ||
168 | * Number of entries in 'seen_replies'. | ||
169 | */ | ||
170 | unsigned int seen_replies_count; | ||
171 | |||
172 | /** | ||
173 | * Desired replication level | ||
174 | */ | ||
175 | uint32_t replication; | ||
176 | |||
177 | /** | ||
178 | * Any message options for this request | ||
179 | */ | ||
180 | uint32_t msg_options; | ||
181 | |||
182 | /** | ||
183 | * The type for the data for the GET request. | ||
184 | */ | ||
185 | enum GNUNET_BLOCK_Type type; | ||
186 | |||
187 | }; | ||
188 | |||
189 | |||
190 | /** | ||
191 | * Struct containing paremeters of monitoring requests. | ||
192 | */ | ||
193 | struct ClientMonitorRecord | ||
194 | { | ||
195 | |||
196 | /** | ||
197 | * Next element in DLL. | ||
198 | */ | ||
199 | struct ClientMonitorRecord *next; | ||
200 | |||
201 | /** | ||
202 | * Previous element in DLL. | ||
203 | */ | ||
204 | struct ClientMonitorRecord *prev; | ||
205 | |||
206 | /** | ||
207 | * Type of blocks that are of interest | ||
208 | */ | ||
209 | enum GNUNET_BLOCK_Type type; | ||
210 | |||
211 | /** | ||
212 | * Key of data of interest, NULL for all. | ||
213 | */ | ||
214 | struct GNUNET_HashCode *key; | ||
215 | |||
216 | /** | ||
217 | * Flag whether to notify about GET messages. | ||
218 | */ | ||
219 | int16_t get; | ||
220 | |||
221 | /** | ||
222 | * Flag whether to notify about GET_REPONSE messages. | ||
223 | */ | ||
224 | int16_t get_resp; | ||
225 | |||
226 | /** | ||
227 | * Flag whether to notify about PUT messages. | ||
228 | */ | ||
229 | uint16_t put; | ||
230 | |||
231 | /** | ||
232 | * Client to notify of these requests. | ||
233 | */ | ||
234 | struct ClientList *client; | ||
235 | }; | ||
236 | |||
237 | |||
238 | /** | ||
239 | * List of active clients. | ||
240 | */ | ||
241 | static struct ClientList *client_head; | ||
242 | |||
243 | /** | ||
244 | * List of active clients. | ||
245 | */ | ||
246 | static struct ClientList *client_tail; | ||
247 | |||
248 | /** | ||
249 | * List of active monitoring requests. | ||
250 | */ | ||
251 | static struct ClientMonitorRecord *monitor_head; | ||
252 | |||
253 | /** | ||
254 | * List of active monitoring requests. | ||
255 | */ | ||
256 | static struct ClientMonitorRecord *monitor_tail; | ||
257 | |||
258 | /** | ||
259 | * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries. | ||
260 | */ | ||
261 | static struct GNUNET_CONTAINER_MultiHashMap *forward_map; | ||
262 | |||
263 | /** | ||
264 | * Heap with all of our client's request, sorted by retry time (earliest on top). | ||
265 | */ | ||
266 | static struct GNUNET_CONTAINER_Heap *retry_heap; | ||
267 | |||
268 | /** | ||
269 | * Task that re-transmits requests (using retry_heap). | ||
270 | */ | ||
271 | static struct GNUNET_SCHEDULER_Task * retry_task; | ||
272 | |||
273 | |||
274 | /** | ||
275 | * Task run to check for messages that need to be sent to a client. | ||
276 | * | ||
277 | * @param client a ClientList, containing the client and any messages to be sent to it | ||
278 | */ | ||
279 | static void | ||
280 | process_pending_messages (struct ClientList *client); | ||
281 | |||
282 | |||
283 | /** | ||
284 | * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready | ||
285 | * request. A ClientList is passed as closure, take the head of the list | ||
286 | * and copy it into buf, which has the result of sending the message to the | ||
287 | * client. | ||
288 | * | ||
289 | * @param cls closure to this call | ||
290 | * @param size maximum number of bytes available to send | ||
291 | * @param buf where to copy the actual message to | ||
292 | * | ||
293 | * @return the number of bytes actually copied, 0 indicates failure | ||
294 | */ | ||
295 | static size_t | ||
296 | send_reply_to_client (void *cls, size_t size, void *buf) | ||
297 | { | ||
298 | struct ClientList *client = cls; | ||
299 | char *cbuf = buf; | ||
300 | struct PendingMessage *reply; | ||
301 | size_t off; | ||
302 | size_t msize; | ||
303 | |||
304 | client->transmit_handle = NULL; | ||
305 | if (buf == NULL) | ||
306 | { | ||
307 | /* client disconnected */ | ||
308 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
309 | "Client %p disconnected, pending messages will be discarded\n", | ||
310 | client->client_handle); | ||
311 | return 0; | ||
312 | } | ||
313 | off = 0; | ||
314 | while ((NULL != (reply = client->pending_head)) && | ||
315 | (size >= off + (msize = ntohs (reply->msg->size)))) | ||
316 | { | ||
317 | GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail, | ||
318 | reply); | ||
319 | memcpy (&cbuf[off], reply->msg, msize); | ||
320 | GNUNET_free (reply); | ||
321 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client %p\n", | ||
322 | msize, client->client_handle); | ||
323 | off += msize; | ||
324 | } | ||
325 | process_pending_messages (client); | ||
326 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client %p\n", | ||
327 | (unsigned int) off, (unsigned int) size, client->client_handle); | ||
328 | return off; | ||
329 | } | ||
330 | |||
331 | |||
332 | /** | ||
333 | * Task run to check for messages that need to be sent to a client. | ||
334 | * | ||
335 | * @param client a ClientList, containing the client and any messages to be sent to it | ||
336 | */ | ||
337 | static void | ||
338 | process_pending_messages (struct ClientList *client) | ||
339 | { | ||
340 | if ((client->pending_head == NULL) || (client->transmit_handle != NULL)) | ||
341 | { | ||
342 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
343 | "Not asking for transmission to %p now: %s\n", | ||
344 | client->client_handle, | ||
345 | client->pending_head == | ||
346 | NULL ? "no more messages" : "request already pending"); | ||
347 | return; | ||
348 | } | ||
349 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
350 | "Asking for transmission of %u bytes to client %p\n", | ||
351 | ntohs (client->pending_head->msg->size), client->client_handle); | ||
352 | client->transmit_handle = | ||
353 | GNUNET_SERVER_notify_transmit_ready (client->client_handle, | ||
354 | ntohs (client->pending_head-> | ||
355 | msg->size), | ||
356 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
357 | &send_reply_to_client, client); | ||
358 | } | ||
359 | |||
360 | |||
361 | /** | ||
362 | * Add a PendingMessage to the clients list of messages to be sent | ||
363 | * | ||
364 | * @param client the active client to send the message to | ||
365 | * @param pending_message the actual message to send | ||
366 | */ | ||
367 | static void | ||
368 | add_pending_message (struct ClientList *client, | ||
369 | struct PendingMessage *pending_message) | ||
370 | { | ||
371 | GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail, | ||
372 | pending_message); | ||
373 | process_pending_messages (client); | ||
374 | } | ||
375 | |||
376 | |||
377 | /** | ||
378 | * Closure for 'forward_reply' | ||
379 | */ | ||
380 | struct ForwardReplyContext | ||
381 | { | ||
382 | |||
383 | /** | ||
384 | * Actual message to send to matching clients. | ||
385 | */ | ||
386 | struct PendingMessage *pm; | ||
387 | |||
388 | /** | ||
389 | * Embedded payload. | ||
390 | */ | ||
391 | const void *data; | ||
392 | |||
393 | /** | ||
394 | * Type of the data. | ||
395 | */ | ||
396 | enum GNUNET_BLOCK_Type type; | ||
397 | |||
398 | /** | ||
399 | * Number of bytes in data. | ||
400 | */ | ||
401 | size_t data_size; | ||
402 | |||
403 | /** | ||
404 | * Do we need to copy 'pm' because it was already used? | ||
405 | */ | ||
406 | int do_copy; | ||
407 | |||
408 | }; | ||
409 | |||
410 | |||
411 | /** | ||
412 | * Find a client if it exists, add it otherwise. | ||
413 | * | ||
414 | * @param client the server handle to the client | ||
415 | * | ||
416 | * @return the client if found, a new client otherwise | ||
417 | */ | ||
418 | static struct ClientList * | ||
419 | find_active_client (struct GNUNET_SERVER_Client *client) | ||
420 | { | ||
421 | struct ClientList *pos = client_head; | ||
422 | struct ClientList *ret; | ||
423 | |||
424 | while (pos != NULL) | ||
425 | { | ||
426 | if (pos->client_handle == client) | ||
427 | return pos; | ||
428 | pos = pos->next; | ||
429 | } | ||
430 | ret = GNUNET_new (struct ClientList); | ||
431 | ret->client_handle = client; | ||
432 | GNUNET_CONTAINER_DLL_insert (client_head, client_tail, ret); | ||
433 | return ret; | ||
434 | } | ||
435 | |||
436 | |||
437 | /** | ||
438 | * Iterator over hash map entries that frees all entries | ||
439 | * associated with the given client. | ||
440 | * | ||
441 | * @param cls client to search for in source routes | ||
442 | * @param key current key code (ignored) | ||
443 | * @param value value in the hash map, a ClientQueryRecord | ||
444 | * @return #GNUNET_YES (we should continue to iterate) | ||
445 | */ | ||
446 | static int | ||
447 | remove_client_records (void *cls, const struct GNUNET_HashCode * key, void *value) | ||
448 | { | ||
449 | struct ClientList *client = cls; | ||
450 | struct ClientQueryRecord *record = value; | ||
451 | |||
452 | if (record->client != client) | ||
453 | return GNUNET_YES; | ||
454 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
455 | "Removing client %p's record for key %s\n", client, | ||
456 | GNUNET_h2s (key)); | ||
457 | GNUNET_assert (GNUNET_YES == | ||
458 | GNUNET_CONTAINER_multihashmap_remove (forward_map, key, | ||
459 | record)); | ||
460 | if (NULL != record->hnode) | ||
461 | GNUNET_CONTAINER_heap_remove_node (record->hnode); | ||
462 | GNUNET_array_grow (record->seen_replies, record->seen_replies_count, 0); | ||
463 | GNUNET_free (record); | ||
464 | return GNUNET_YES; | ||
465 | } | ||
466 | |||
467 | |||
468 | /** | ||
469 | * Iterator over hash map entries that send a given reply to | ||
470 | * each of the matching clients. With some tricky recycling | ||
471 | * of the buffer. | ||
472 | * | ||
473 | * @param cls the 'struct ForwardReplyContext' | ||
474 | * @param key current key | ||
475 | * @param value value in the hash map, a ClientQueryRecord | ||
476 | * @return GNUNET_YES (we should continue to iterate), | ||
477 | * if the result is mal-formed, GNUNET_NO | ||
478 | */ | ||
479 | static int | ||
480 | forward_reply (void *cls, const struct GNUNET_HashCode * key, void *value) | ||
481 | { | ||
482 | struct ForwardReplyContext *frc = cls; | ||
483 | struct ClientQueryRecord *record = value; | ||
484 | struct PendingMessage *pm; | ||
485 | struct GNUNET_DHT_ClientResultMessage *reply; | ||
486 | enum GNUNET_BLOCK_EvaluationResult eval; | ||
487 | int do_free; | ||
488 | struct GNUNET_HashCode ch; | ||
489 | unsigned int i; | ||
490 | |||
491 | LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, | ||
492 | "XVINE CLIENT-RESULT %s\n", | ||
493 | GNUNET_h2s_full (key)); | ||
494 | #if 0 | ||
495 | if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type)) | ||
496 | { | ||
497 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
498 | "Record type missmatch, not passing request for key %s to local client\n", | ||
499 | GNUNET_h2s (key)); | ||
500 | |||
501 | GNUNET_STATISTICS_update (GDS_stats, | ||
502 | gettext_noop | ||
503 | ("# Key match, type mismatches in REPLY to CLIENT"), | ||
504 | 1, GNUNET_NO); | ||
505 | return GNUNET_YES; /* type mismatch */ | ||
506 | } | ||
507 | #endif | ||
508 | GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch); | ||
509 | for (i = 0; i < record->seen_replies_count; i++) | ||
510 | if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (struct GNUNET_HashCode))) | ||
511 | { | ||
512 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
513 | "Duplicate reply, not passing request for key %s to local client\n", | ||
514 | GNUNET_h2s (key)); | ||
515 | GNUNET_STATISTICS_update (GDS_stats, | ||
516 | gettext_noop | ||
517 | ("# Duplicate REPLIES to CLIENT request dropped"), | ||
518 | 1, GNUNET_NO); | ||
519 | return GNUNET_YES; /* duplicate */ | ||
520 | } | ||
521 | eval = | ||
522 | GNUNET_BLOCK_evaluate (GDS_block_context, | ||
523 | record->type, | ||
524 | GNUNET_BLOCK_EO_NONE, | ||
525 | key, NULL, 0, | ||
526 | record->xquery, | ||
527 | record->xquery_size, | ||
528 | frc->data, | ||
529 | frc->data_size); | ||
530 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
531 | "Evaluation result is %d for key %s for local client's query\n", | ||
532 | (int) eval, GNUNET_h2s (key)); | ||
533 | switch (eval) | ||
534 | { | ||
535 | case GNUNET_BLOCK_EVALUATION_OK_LAST: | ||
536 | do_free = GNUNET_YES; | ||
537 | break; | ||
538 | case GNUNET_BLOCK_EVALUATION_OK_MORE: | ||
539 | GNUNET_array_append (record->seen_replies, record->seen_replies_count, ch); | ||
540 | do_free = GNUNET_NO; | ||
541 | break; | ||
542 | case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: | ||
543 | /* should be impossible to encounter here */ | ||
544 | GNUNET_break (0); | ||
545 | return GNUNET_YES; | ||
546 | case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: | ||
547 | GNUNET_break_op (0); | ||
548 | return GNUNET_NO; | ||
549 | case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: | ||
550 | GNUNET_break (0); | ||
551 | return GNUNET_NO; | ||
552 | case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: | ||
553 | GNUNET_break (0); | ||
554 | return GNUNET_NO; | ||
555 | case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT: | ||
556 | return GNUNET_YES; | ||
557 | case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: | ||
558 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
559 | _("Unsupported block type (%u) in request!\n"), record->type); | ||
560 | return GNUNET_NO; | ||
561 | default: | ||
562 | GNUNET_break (0); | ||
563 | return GNUNET_NO; | ||
564 | } | ||
565 | if (GNUNET_NO == frc->do_copy) | ||
566 | { | ||
567 | /* first time, we can use the original data */ | ||
568 | pm = frc->pm; | ||
569 | frc->do_copy = GNUNET_YES; | ||
570 | } | ||
571 | else | ||
572 | { | ||
573 | /* two clients waiting for same reply, must copy for queueing */ | ||
574 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + | ||
575 | ntohs (frc->pm->msg->size)); | ||
576 | memcpy (pm, frc->pm, | ||
577 | sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size)); | ||
578 | pm->next = pm->prev = NULL; | ||
579 | pm->msg = (struct GNUNET_MessageHeader *) &pm[1]; | ||
580 | } | ||
581 | GNUNET_STATISTICS_update (GDS_stats, | ||
582 | gettext_noop ("# RESULTS queued for clients"), 1, | ||
583 | GNUNET_NO); | ||
584 | reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1]; | ||
585 | reply->unique_id = record->unique_id; | ||
586 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
587 | "Queueing reply to query %s for client %p\n", | ||
588 | GNUNET_h2s (key), | ||
589 | record->client->client_handle); | ||
590 | add_pending_message (record->client, pm); | ||
591 | if (GNUNET_YES == do_free) | ||
592 | remove_client_records (record->client, key, record); | ||
593 | return GNUNET_YES; | ||
594 | } | ||
595 | |||
596 | |||
597 | /** | ||
598 | * Handle a reply we've received from another peer. If the reply | ||
599 | * matches any of our pending queries, forward it to the respective | ||
600 | * client(s). | ||
601 | * | ||
602 | * @param expiration when will the reply expire | ||
603 | * @param key the query this reply is for | ||
604 | * @param get_path_length number of peers in @a get_path | ||
605 | * @param get_path path the reply took on get | ||
606 | * @param put_path_length number of peers in @a put_path | ||
607 | * @param put_path path the reply took on put | ||
608 | * @param type type of the reply | ||
609 | * @param data_size number of bytes in @a data | ||
610 | * @param data application payload data | ||
611 | */ | ||
612 | void | ||
613 | GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, | ||
614 | const struct GNUNET_HashCode *key, | ||
615 | unsigned int get_path_length, | ||
616 | const struct GNUNET_PeerIdentity *get_path, | ||
617 | unsigned int put_path_length, | ||
618 | const struct GNUNET_PeerIdentity *put_path, | ||
619 | enum GNUNET_BLOCK_Type type, size_t data_size, | ||
620 | const void *data) | ||
621 | { | ||
622 | struct ForwardReplyContext frc; | ||
623 | struct PendingMessage *pm; | ||
624 | struct GNUNET_DHT_ClientResultMessage *reply; | ||
625 | struct GNUNET_PeerIdentity *paths; | ||
626 | size_t msize; | ||
627 | |||
628 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
629 | "reply for key %s\n", | ||
630 | GNUNET_h2s (key)); | ||
631 | |||
632 | if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map, key)) | ||
633 | { | ||
634 | GNUNET_STATISTICS_update (GDS_stats, | ||
635 | gettext_noop | ||
636 | ("# REPLIES ignored for CLIENTS (no match)"), 1, | ||
637 | GNUNET_NO); | ||
638 | return; /* no matching request, fast exit! */ | ||
639 | } | ||
640 | msize = | ||
641 | sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size + | ||
642 | (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity); | ||
643 | if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) | ||
644 | { | ||
645 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
646 | _("Could not pass reply to client, message too big!\n")); | ||
647 | return; | ||
648 | } | ||
649 | DEBUG("reply FOR DATA_SIZE = %lu\n",msize); | ||
650 | pm = GNUNET_malloc (msize + sizeof (struct PendingMessage)); | ||
651 | reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1]; | ||
652 | pm->msg = &reply->header; | ||
653 | reply->header.size = htons ((uint16_t) msize); | ||
654 | reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT); | ||
655 | reply->type = htonl (type); | ||
656 | reply->get_path_length = htonl (get_path_length); | ||
657 | reply->put_path_length = htonl (put_path_length); | ||
658 | reply->unique_id = 0; /* filled in later */ | ||
659 | reply->expiration = GNUNET_TIME_absolute_hton (expiration); | ||
660 | reply->key = *key; | ||
661 | paths = (struct GNUNET_PeerIdentity *) &reply[1]; | ||
662 | memcpy (paths, put_path, | ||
663 | sizeof (struct GNUNET_PeerIdentity) * put_path_length); | ||
664 | memcpy (&paths[put_path_length], get_path, | ||
665 | sizeof (struct GNUNET_PeerIdentity) * get_path_length); | ||
666 | memcpy (&paths[get_path_length + put_path_length], data, data_size); | ||
667 | frc.do_copy = GNUNET_NO; | ||
668 | frc.pm = pm; | ||
669 | frc.data = data; | ||
670 | frc.data_size = data_size; | ||
671 | frc.type = type; | ||
672 | GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, key, &forward_reply, | ||
673 | &frc); | ||
674 | if (GNUNET_NO == frc.do_copy) | ||
675 | { | ||
676 | /* did not match any of the requests, free! */ | ||
677 | GNUNET_STATISTICS_update (GDS_stats, | ||
678 | gettext_noop | ||
679 | ("# REPLIES ignored for CLIENTS (no match)"), 1, | ||
680 | GNUNET_NO); | ||
681 | GNUNET_free (pm); | ||
682 | } | ||
683 | } | ||
684 | |||
685 | /** | ||
686 | * Check if some client is monitoring GET messages and notify | ||
687 | * them in that case. | ||
688 | * | ||
689 | * @param options Options, for instance RecordRoute, DemultiplexEverywhere. | ||
690 | * @param type The type of data in the request. | ||
691 | * @param hop_count Hop count so far. | ||
692 | * @param path_length number of entries in path (or 0 if not recorded). | ||
693 | * @param path peers on the GET path (or NULL if not recorded). | ||
694 | * @param desired_replication_level Desired replication level. | ||
695 | * @param key Key of the requested data. | ||
696 | */ | ||
697 | void | ||
698 | GDS_CLIENTS_process_get (uint32_t options, | ||
699 | enum GNUNET_BLOCK_Type type, | ||
700 | uint32_t hop_count, | ||
701 | uint32_t desired_replication_level, | ||
702 | unsigned int path_length, | ||
703 | const struct GNUNET_PeerIdentity *path, | ||
704 | const struct GNUNET_HashCode * key) | ||
705 | { | ||
706 | struct ClientMonitorRecord *m; | ||
707 | struct ClientList **cl; | ||
708 | unsigned int cl_size; | ||
709 | |||
710 | cl = NULL; | ||
711 | cl_size = 0; | ||
712 | for (m = monitor_head; NULL != m; m = m->next) | ||
713 | { | ||
714 | if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && | ||
715 | (NULL == m->key || | ||
716 | memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) | ||
717 | { | ||
718 | struct PendingMessage *pm; | ||
719 | struct GNUNET_DHT_MonitorGetMessage *mmsg; | ||
720 | struct GNUNET_PeerIdentity *msg_path; | ||
721 | size_t msize; | ||
722 | unsigned int i; | ||
723 | |||
724 | /* Don't send duplicates */ | ||
725 | for (i = 0; i < cl_size; i++) | ||
726 | if (cl[i] == m->client) | ||
727 | break; | ||
728 | if (i < cl_size) | ||
729 | continue; | ||
730 | GNUNET_array_append (cl, cl_size, m->client); | ||
731 | |||
732 | msize = path_length * sizeof (struct GNUNET_PeerIdentity); | ||
733 | msize += sizeof (struct GNUNET_DHT_MonitorGetMessage); | ||
734 | msize += sizeof (struct PendingMessage); | ||
735 | pm = GNUNET_malloc (msize); | ||
736 | mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1]; | ||
737 | pm->msg = &mmsg->header; | ||
738 | mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); | ||
739 | mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); | ||
740 | mmsg->options = htonl(options); | ||
741 | mmsg->type = htonl(type); | ||
742 | mmsg->hop_count = htonl(hop_count); | ||
743 | mmsg->desired_replication_level = htonl(desired_replication_level); | ||
744 | mmsg->get_path_length = htonl(path_length); | ||
745 | memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); | ||
746 | msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; | ||
747 | if (path_length > 0) | ||
748 | memcpy (msg_path, path, | ||
749 | path_length * sizeof (struct GNUNET_PeerIdentity)); | ||
750 | add_pending_message (m->client, pm); | ||
751 | } | ||
752 | } | ||
753 | GNUNET_free_non_null (cl); | ||
754 | } | ||
755 | |||
756 | |||
757 | /** | ||
758 | * Check if some client is monitoring PUT messages and notify | ||
759 | * them in that case. | ||
760 | * | ||
761 | * @param options Options, for instance RecordRoute, DemultiplexEverywhere. | ||
762 | * @param type The type of data in the request. | ||
763 | * @param hop_count Hop count so far. | ||
764 | * @param path_length number of entries in path (or 0 if not recorded). | ||
765 | * @param path peers on the PUT path (or NULL if not recorded). | ||
766 | * @param desired_replication_level Desired replication level. | ||
767 | * @param exp Expiration time of the data. | ||
768 | * @param key Key under which data is to be stored. | ||
769 | * @param data Pointer to the data carried. | ||
770 | * @param size Number of bytes in data. | ||
771 | */ | ||
772 | void | ||
773 | GDS_CLIENTS_process_put (uint32_t options, | ||
774 | enum GNUNET_BLOCK_Type type, | ||
775 | uint32_t hop_count, | ||
776 | uint32_t desired_replication_level, | ||
777 | unsigned int path_length, | ||
778 | const struct GNUNET_PeerIdentity *path, | ||
779 | struct GNUNET_TIME_Absolute exp, | ||
780 | const struct GNUNET_HashCode * key, | ||
781 | const void *data, | ||
782 | size_t size) | ||
783 | { | ||
784 | struct ClientMonitorRecord *m; | ||
785 | struct ClientList **cl; | ||
786 | unsigned int cl_size; | ||
787 | |||
788 | cl = NULL; | ||
789 | cl_size = 0; | ||
790 | for (m = monitor_head; NULL != m; m = m->next) | ||
791 | { | ||
792 | if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && | ||
793 | (NULL == m->key || | ||
794 | memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) | ||
795 | { | ||
796 | struct PendingMessage *pm; | ||
797 | struct GNUNET_DHT_MonitorPutMessage *mmsg; | ||
798 | struct GNUNET_PeerIdentity *msg_path; | ||
799 | size_t msize; | ||
800 | unsigned int i; | ||
801 | |||
802 | /* Don't send duplicates */ | ||
803 | for (i = 0; i < cl_size; i++) | ||
804 | if (cl[i] == m->client) | ||
805 | break; | ||
806 | if (i < cl_size) | ||
807 | continue; | ||
808 | GNUNET_array_append (cl, cl_size, m->client); | ||
809 | |||
810 | msize = size; | ||
811 | msize += path_length * sizeof (struct GNUNET_PeerIdentity); | ||
812 | msize += sizeof (struct GNUNET_DHT_MonitorPutMessage); | ||
813 | msize += sizeof (struct PendingMessage); | ||
814 | pm = GNUNET_malloc (msize); | ||
815 | mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1]; | ||
816 | pm->msg = (struct GNUNET_MessageHeader *) mmsg; | ||
817 | mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); | ||
818 | mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT); | ||
819 | mmsg->options = htonl(options); | ||
820 | mmsg->type = htonl(type); | ||
821 | mmsg->hop_count = htonl(hop_count); | ||
822 | mmsg->desired_replication_level = htonl(desired_replication_level); | ||
823 | mmsg->put_path_length = htonl(path_length); | ||
824 | msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; | ||
825 | if (path_length > 0) | ||
826 | { | ||
827 | memcpy (msg_path, path, | ||
828 | path_length * sizeof (struct GNUNET_PeerIdentity)); | ||
829 | } | ||
830 | mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); | ||
831 | memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); | ||
832 | if (size > 0) | ||
833 | memcpy (&msg_path[path_length], data, size); | ||
834 | add_pending_message (m->client, pm); | ||
835 | } | ||
836 | } | ||
837 | GNUNET_free_non_null (cl); | ||
838 | } | ||
839 | |||
840 | |||
841 | /** | ||
842 | * Route the given request via the DHT. | ||
843 | */ | ||
844 | static void | ||
845 | transmit_request (struct ClientQueryRecord *cqr) | ||
846 | { | ||
847 | GNUNET_STATISTICS_update (GDS_stats, | ||
848 | gettext_noop | ||
849 | ("# GET requests from clients injected"), 1, | ||
850 | GNUNET_NO); | ||
851 | |||
852 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
853 | "Initiating GET for %s, replication %u, already have %u replies\n", | ||
854 | GNUNET_h2s (&cqr->key), | ||
855 | cqr->replication, | ||
856 | cqr->seen_replies_count); | ||
857 | |||
858 | GDS_NEIGHBOURS_handle_get (&cqr->key, cqr->type, cqr->msg_options, | ||
859 | cqr->replication); | ||
860 | |||
861 | /* exponential back-off for retries. | ||
862 | * max GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */ | ||
863 | cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency); | ||
864 | cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency); | ||
865 | } | ||
866 | |||
867 | |||
868 | /** | ||
869 | * Task that looks at the 'retry_heap' and transmits all of the requests | ||
870 | * on the heap that are ready for transmission. Then re-schedules | ||
871 | * itself (unless the heap is empty). | ||
872 | * | ||
873 | * @param cls unused | ||
874 | * @param tc scheduler context | ||
875 | */ | ||
876 | static void | ||
877 | transmit_next_request_task (void *cls, | ||
878 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
879 | { | ||
880 | struct ClientQueryRecord *cqr; | ||
881 | struct GNUNET_TIME_Relative delay; | ||
882 | |||
883 | retry_task = NULL; | ||
884 | if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
885 | return; | ||
886 | while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap))) | ||
887 | { | ||
888 | cqr->hnode = NULL; | ||
889 | delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time); | ||
890 | if (delay.rel_value_us > 0) | ||
891 | { | ||
892 | cqr->hnode = | ||
893 | GNUNET_CONTAINER_heap_insert (retry_heap, cqr, | ||
894 | cqr->retry_time.abs_value_us); | ||
895 | retry_task = | ||
896 | GNUNET_SCHEDULER_add_delayed (delay, &transmit_next_request_task, | ||
897 | NULL); | ||
898 | return; | ||
899 | } | ||
900 | transmit_request (cqr); | ||
901 | cqr->hnode = | ||
902 | GNUNET_CONTAINER_heap_insert (retry_heap, cqr, | ||
903 | cqr->retry_time.abs_value_us); | ||
904 | } | ||
905 | } | ||
906 | |||
907 | |||
908 | /** | ||
909 | * Handler for PUT messages. | ||
910 | * | ||
911 | * @param cls closure for the service | ||
912 | * @param client the client we received this message from | ||
913 | * @param message the actual message received | ||
914 | */ | ||
915 | static void | ||
916 | handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, | ||
917 | const struct GNUNET_MessageHeader *message) | ||
918 | { | ||
919 | const struct GNUNET_DHT_ClientPutMessage *put_msg; | ||
920 | struct PendingMessage *pm; | ||
921 | struct GNUNET_DHT_ClientPutConfirmationMessage *conf; | ||
922 | uint16_t size; | ||
923 | |||
924 | size = ntohs (message->size); | ||
925 | if (size < sizeof (struct GNUNET_DHT_ClientPutMessage)) | ||
926 | { | ||
927 | GNUNET_break (0); | ||
928 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
929 | return; | ||
930 | } | ||
931 | GNUNET_STATISTICS_update (GDS_stats, | ||
932 | gettext_noop | ||
933 | ("# PUT requests received from clients"), 1, | ||
934 | GNUNET_NO); | ||
935 | put_msg = (const struct GNUNET_DHT_ClientPutMessage *) message; | ||
936 | LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "X-VINE DHT CLIENT-PUT %s\n", | ||
937 | GNUNET_h2s_full (&put_msg->key)); | ||
938 | /* give to local clients */ | ||
939 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
940 | "Handling local PUT of %u-bytes for query %s\n", | ||
941 | size - sizeof (struct GNUNET_DHT_ClientPutMessage), | ||
942 | GNUNET_h2s (&put_msg->key)); | ||
943 | DEBUG("PUT doing put i = %s\n",GNUNET_h2s(&(put_msg->key))); | ||
944 | GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put_msg->expiration), | ||
945 | &put_msg->key, 0, NULL, 0, NULL, | ||
946 | ntohl (put_msg->type), | ||
947 | size - sizeof (struct GNUNET_DHT_ClientPutMessage), | ||
948 | &put_msg[1]); | ||
949 | |||
950 | GDS_NEIGHBOURS_handle_put (&put_msg->key, | ||
951 | ntohl (put_msg->type), ntohl (put_msg->options), | ||
952 | ntohl (put_msg->desired_replication_level), | ||
953 | GNUNET_TIME_absolute_ntoh (put_msg->expiration), | ||
954 | &put_msg[1], | ||
955 | size - sizeof (struct GNUNET_DHT_ClientPutMessage)); | ||
956 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + | ||
957 | sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)); | ||
958 | conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1]; | ||
959 | conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)); | ||
960 | conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK); | ||
961 | conf->reserved = htonl (0); | ||
962 | conf->unique_id = put_msg->unique_id; | ||
963 | pm->msg = &conf->header; | ||
964 | add_pending_message (find_active_client (client), pm); | ||
965 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
966 | } | ||
967 | |||
968 | |||
969 | /** | ||
970 | * Handler for DHT GET messages from the client. | ||
971 | * | ||
972 | * @param cls closure for the service | ||
973 | * @param client the client we received this message from | ||
974 | * @param message the actual message received | ||
975 | */ | ||
976 | static void | ||
977 | handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, | ||
978 | const struct GNUNET_MessageHeader *message) | ||
979 | { | ||
980 | const struct GNUNET_DHT_ClientGetMessage *get; | ||
981 | struct ClientQueryRecord *cqr; | ||
982 | size_t xquery_size; | ||
983 | const char *xquery; | ||
984 | uint16_t size; | ||
985 | |||
986 | size = ntohs (message->size); | ||
987 | if (size < sizeof (struct GNUNET_DHT_ClientGetMessage)) | ||
988 | { | ||
989 | GNUNET_break (0); | ||
990 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
991 | return; | ||
992 | } | ||
993 | xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage); | ||
994 | get = (const struct GNUNET_DHT_ClientGetMessage *) message; | ||
995 | xquery = (const char *) &get[1]; | ||
996 | GNUNET_STATISTICS_update (GDS_stats, | ||
997 | gettext_noop | ||
998 | ("# GET requests received from clients"), 1, | ||
999 | GNUNET_NO); | ||
1000 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1001 | "Received GET request for %s from local client %p, xq: %.*s\n", | ||
1002 | GNUNET_h2s (&get->key), client, xquery_size, xquery); | ||
1003 | |||
1004 | LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "X-VINE CLIENT-GET %s\n", | ||
1005 | GNUNET_h2s_full (&get->key)); | ||
1006 | |||
1007 | |||
1008 | cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); | ||
1009 | cqr->key = get->key; | ||
1010 | cqr->client = find_active_client (client); | ||
1011 | cqr->xquery = (void *) &cqr[1]; | ||
1012 | memcpy (&cqr[1], xquery, xquery_size); | ||
1013 | cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); | ||
1014 | cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS; | ||
1015 | cqr->retry_time = GNUNET_TIME_absolute_get (); | ||
1016 | cqr->unique_id = get->unique_id; | ||
1017 | cqr->xquery_size = xquery_size; | ||
1018 | cqr->replication = ntohl (get->desired_replication_level); | ||
1019 | cqr->msg_options = ntohl (get->options); | ||
1020 | cqr->type = ntohl (get->type); | ||
1021 | |||
1022 | // FIXME use cqr->key, set multihashmap create to GNUNET_YES | ||
1023 | GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr, | ||
1024 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
1025 | |||
1026 | struct GNUNET_PeerIdentity my_identity; | ||
1027 | my_identity = GDS_NEIGHBOURS_get_my_id(); | ||
1028 | GDS_CLIENTS_process_get (ntohl (get->options), | ||
1029 | ntohl (get->type), | ||
1030 | 0, | ||
1031 | ntohl (get->desired_replication_level), | ||
1032 | 1, | ||
1033 | &my_identity, | ||
1034 | &get->key); | ||
1035 | /* start remote requests */ | ||
1036 | if (NULL != retry_task) | ||
1037 | GNUNET_SCHEDULER_cancel (retry_task); | ||
1038 | retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL); | ||
1039 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
1040 | } | ||
1041 | |||
1042 | |||
1043 | /** | ||
1044 | * Closure for 'find_by_unique_id'. | ||
1045 | */ | ||
1046 | struct FindByUniqueIdContext | ||
1047 | { | ||
1048 | /** | ||
1049 | * Where to store the result, if found. | ||
1050 | */ | ||
1051 | struct ClientQueryRecord *cqr; | ||
1052 | |||
1053 | uint64_t unique_id; | ||
1054 | }; | ||
1055 | |||
1056 | |||
1057 | /** | ||
1058 | * Function called for each existing DHT record for the given | ||
1059 | * query. Checks if it matches the UID given in the closure | ||
1060 | * and if so returns the entry as a result. | ||
1061 | * | ||
1062 | * @param cls the search context | ||
1063 | * @param key query for the lookup (not used) | ||
1064 | * @param value the 'struct ClientQueryRecord' | ||
1065 | * @return GNUNET_YES to continue iteration (result not yet found) | ||
1066 | */ | ||
1067 | static int | ||
1068 | find_by_unique_id (void *cls, | ||
1069 | const struct GNUNET_HashCode *key, | ||
1070 | void *value) | ||
1071 | { | ||
1072 | struct FindByUniqueIdContext *fui_ctx = cls; | ||
1073 | struct ClientQueryRecord *cqr = value; | ||
1074 | |||
1075 | if (cqr->unique_id != fui_ctx->unique_id) | ||
1076 | return GNUNET_YES; | ||
1077 | fui_ctx->cqr = cqr; | ||
1078 | return GNUNET_NO; | ||
1079 | } | ||
1080 | |||
1081 | |||
1082 | /** | ||
1083 | * Handler for "GET result seen" messages from the client. | ||
1084 | * | ||
1085 | * @param cls closure for the service | ||
1086 | * @param client the client we received this message from | ||
1087 | * @param message the actual message received | ||
1088 | */ | ||
1089 | static void | ||
1090 | handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client, | ||
1091 | const struct GNUNET_MessageHeader *message) | ||
1092 | { | ||
1093 | const struct GNUNET_DHT_ClientGetResultSeenMessage *seen; | ||
1094 | uint16_t size; | ||
1095 | unsigned int hash_count; | ||
1096 | unsigned int old_count; | ||
1097 | const struct GNUNET_HashCode *hc; | ||
1098 | struct FindByUniqueIdContext fui_ctx; | ||
1099 | struct ClientQueryRecord *cqr; | ||
1100 | |||
1101 | size = ntohs (message->size); | ||
1102 | if (size < sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) | ||
1103 | { | ||
1104 | GNUNET_break (0); | ||
1105 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
1106 | return; | ||
1107 | } | ||
1108 | seen = (const struct GNUNET_DHT_ClientGetResultSeenMessage *) message; | ||
1109 | hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode); | ||
1110 | if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode)) | ||
1111 | { | ||
1112 | GNUNET_break (0); | ||
1113 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
1114 | return; | ||
1115 | } | ||
1116 | hc = (const struct GNUNET_HashCode*) &seen[1]; | ||
1117 | fui_ctx.unique_id = seen->unique_id; | ||
1118 | fui_ctx.cqr = NULL; | ||
1119 | GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, | ||
1120 | &seen->key, | ||
1121 | &find_by_unique_id, | ||
1122 | &fui_ctx); | ||
1123 | if (NULL == (cqr = fui_ctx.cqr)) | ||
1124 | { | ||
1125 | GNUNET_break (0); | ||
1126 | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | ||
1127 | return; | ||
1128 | } | ||
1129 | /* finally, update 'seen' list */ | ||
1130 | old_count = cqr->seen_replies_count; | ||
1131 | GNUNET_array_grow (cqr->seen_replies, | ||
1132 | cqr->seen_replies_count, | ||
1133 | cqr->seen_replies_count + hash_count); | ||
1134 | memcpy (&cqr->seen_replies[old_count], | ||
1135 | hc, | ||
1136 | sizeof (struct GNUNET_HashCode) * hash_count); | ||
1137 | } | ||
1138 | |||
1139 | |||
1140 | /** | ||
1141 | * Closure for 'remove_by_unique_id'. | ||
1142 | */ | ||
1143 | struct RemoveByUniqueIdContext | ||
1144 | { | ||
1145 | /** | ||
1146 | * Client that issued the removal request. | ||
1147 | */ | ||
1148 | struct ClientList *client; | ||
1149 | |||
1150 | /** | ||
1151 | * Unique ID of the request. | ||
1152 | */ | ||
1153 | uint64_t unique_id; | ||
1154 | }; | ||
1155 | |||
1156 | |||
1157 | /** | ||
1158 | * Iterator over hash map entries that frees all entries | ||
1159 | * that match the given client and unique ID. | ||
1160 | * | ||
1161 | * @param cls unique ID and client to search for in source routes | ||
1162 | * @param key current key code | ||
1163 | * @param value value in the hash map, a ClientQueryRecord | ||
1164 | * @return GNUNET_YES (we should continue to iterate) | ||
1165 | */ | ||
1166 | static int | ||
1167 | remove_by_unique_id (void *cls, const struct GNUNET_HashCode * key, void *value) | ||
1168 | { | ||
1169 | const struct RemoveByUniqueIdContext *ctx = cls; | ||
1170 | struct ClientQueryRecord *record = value; | ||
1171 | |||
1172 | if (record->unique_id != ctx->unique_id) | ||
1173 | return GNUNET_YES; | ||
1174 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1175 | "Removing client %p's record for key %s (by unique id)\n", | ||
1176 | ctx->client->client_handle, GNUNET_h2s (key)); | ||
1177 | return remove_client_records (ctx->client, key, record); | ||
1178 | } | ||
1179 | |||
1180 | |||
1181 | /** | ||
1182 | * Handler for any generic DHT stop messages, calls the appropriate handler | ||
1183 | * depending on message type (if processed locally) | ||
1184 | * | ||
1185 | * @param cls closure for the service | ||
1186 | * @param client the client we received this message from | ||
1187 | * @param message the actual message received | ||
1188 | * | ||
1189 | */ | ||
1190 | static void | ||
1191 | handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client, | ||
1192 | const struct GNUNET_MessageHeader *message) | ||
1193 | { | ||
1194 | const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg = | ||
1195 | (const struct GNUNET_DHT_ClientGetStopMessage *) message; | ||
1196 | struct RemoveByUniqueIdContext ctx; | ||
1197 | |||
1198 | GNUNET_STATISTICS_update (GDS_stats, | ||
1199 | gettext_noop | ||
1200 | ("# GET STOP requests received from clients"), 1, | ||
1201 | GNUNET_NO); | ||
1202 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1203 | "Received GET STOP request for %s from local client %p\n", | ||
1204 | client, GNUNET_h2s (&dht_stop_msg->key)); | ||
1205 | ctx.client = find_active_client (client); | ||
1206 | ctx.unique_id = dht_stop_msg->unique_id; | ||
1207 | GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->key, | ||
1208 | &remove_by_unique_id, &ctx); | ||
1209 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
1210 | } | ||
1211 | |||
1212 | |||
1213 | /** | ||
1214 | * Handler for monitor start messages | ||
1215 | * | ||
1216 | * @param cls closure for the service | ||
1217 | * @param client the client we received this message from | ||
1218 | * @param message the actual message received | ||
1219 | * | ||
1220 | */ | ||
1221 | static void | ||
1222 | handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client, | ||
1223 | const struct GNUNET_MessageHeader *message) | ||
1224 | { | ||
1225 | struct ClientMonitorRecord *r; | ||
1226 | const struct GNUNET_DHT_MonitorStartStopMessage *msg; | ||
1227 | |||
1228 | msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message; | ||
1229 | r = GNUNET_new (struct ClientMonitorRecord); | ||
1230 | |||
1231 | r->client = find_active_client(client); | ||
1232 | r->type = ntohl(msg->type); | ||
1233 | r->get = ntohs(msg->get); | ||
1234 | r->get_resp = ntohs(msg->get_resp); | ||
1235 | r->put = ntohs(msg->put); | ||
1236 | if (0 == ntohs(msg->filter_key)) | ||
1237 | r->key = NULL; | ||
1238 | else | ||
1239 | { | ||
1240 | r->key = GNUNET_new (struct GNUNET_HashCode); | ||
1241 | memcpy (r->key, &msg->key, sizeof (struct GNUNET_HashCode)); | ||
1242 | } | ||
1243 | GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r); | ||
1244 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
1245 | } | ||
1246 | |||
1247 | |||
1248 | /** | ||
1249 | * Handler for monitor stop messages | ||
1250 | * | ||
1251 | * @param cls closure for the service | ||
1252 | * @param client the client we received this message from | ||
1253 | * @param message the actual message received | ||
1254 | * | ||
1255 | */ | ||
1256 | static void | ||
1257 | handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *client, | ||
1258 | const struct GNUNET_MessageHeader *message) | ||
1259 | { | ||
1260 | struct ClientMonitorRecord *r; | ||
1261 | const struct GNUNET_DHT_MonitorStartStopMessage *msg; | ||
1262 | int keys_match; | ||
1263 | |||
1264 | msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message; | ||
1265 | r = monitor_head; | ||
1266 | |||
1267 | while (NULL != r) | ||
1268 | { | ||
1269 | if (NULL == r->key) | ||
1270 | keys_match = (0 == ntohs(msg->filter_key)); | ||
1271 | else | ||
1272 | { | ||
1273 | keys_match = (0 != ntohs(msg->filter_key) | ||
1274 | && !memcmp(r->key, &msg->key, sizeof(struct GNUNET_HashCode))); | ||
1275 | } | ||
1276 | if (find_active_client(client) == r->client | ||
1277 | && ntohl(msg->type) == r->type | ||
1278 | && r->get == msg->get | ||
1279 | && r->get_resp == msg->get_resp | ||
1280 | && r->put == msg->put | ||
1281 | && keys_match | ||
1282 | ) | ||
1283 | { | ||
1284 | GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, r); | ||
1285 | GNUNET_free_non_null (r->key); | ||
1286 | GNUNET_free (r); | ||
1287 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
1288 | return; /* Delete only ONE entry */ | ||
1289 | } | ||
1290 | r = r->next; | ||
1291 | } | ||
1292 | |||
1293 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
1294 | } | ||
1295 | |||
1296 | |||
1297 | #if ENABLE_MALICIOUS | ||
1298 | /** | ||
1299 | * Handler for act malicious message. | ||
1300 | * | ||
1301 | * @param cls closure for the service | ||
1302 | * @param client the client we received this message from | ||
1303 | * @param message the actual message received | ||
1304 | * | ||
1305 | */ | ||
1306 | static void | ||
1307 | handle_dht_act_malicious (void *cls, struct GNUNET_SERVER_Client *client, | ||
1308 | const struct GNUNET_MessageHeader *message) | ||
1309 | { | ||
1310 | const struct GNUNET_DHT_ActMaliciousMessage *msg; | ||
1311 | struct PendingMessage *pm; | ||
1312 | struct GNUNET_DHT_ClientActMaliciousConfirmationMessage *conf; | ||
1313 | unsigned int malicious_action; | ||
1314 | |||
1315 | msg = (const struct GNUNET_DHT_ActMaliciousMessage *)message; | ||
1316 | malicious_action = msg->action; | ||
1317 | |||
1318 | if(GNUNET_OK == GDS_NEIGHBOURS_act_malicious (malicious_action)) | ||
1319 | { | ||
1320 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + | ||
1321 | sizeof (struct GNUNET_DHT_ClientActMaliciousConfirmationMessage)); | ||
1322 | conf = (struct GNUNET_DHT_ClientActMaliciousConfirmationMessage *) &pm[1]; | ||
1323 | conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientActMaliciousConfirmationMessage)); | ||
1324 | conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_ACT_MALICIOUS_OK); | ||
1325 | pm->msg = &conf->header; | ||
1326 | add_pending_message (find_active_client (client), pm); | ||
1327 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
1328 | } | ||
1329 | } | ||
1330 | #endif | ||
1331 | |||
1332 | |||
1333 | /** | ||
1334 | * Functions with this signature are called whenever a client | ||
1335 | * is disconnected on the network level. | ||
1336 | * | ||
1337 | * @param cls closure (NULL for dht) | ||
1338 | * @param client identification of the client; NULL | ||
1339 | * for the last call when the server is destroyed | ||
1340 | */ | ||
1341 | static void | ||
1342 | handle_client_disconnect (void *cls, | ||
1343 | struct GNUNET_SERVER_Client *client) | ||
1344 | { | ||
1345 | struct ClientList *pos; | ||
1346 | struct PendingMessage *reply; | ||
1347 | struct ClientMonitorRecord *monitor; | ||
1348 | |||
1349 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1350 | "Local client %p disconnects\n", | ||
1351 | client); | ||
1352 | pos = find_active_client (client); | ||
1353 | GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos); | ||
1354 | if (pos->transmit_handle != NULL) | ||
1355 | GNUNET_SERVER_notify_transmit_ready_cancel (pos->transmit_handle); | ||
1356 | while (NULL != (reply = pos->pending_head)) | ||
1357 | { | ||
1358 | GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, reply); | ||
1359 | GNUNET_free (reply); | ||
1360 | } | ||
1361 | monitor = monitor_head; | ||
1362 | while (NULL != monitor) | ||
1363 | { | ||
1364 | if (monitor->client == pos) | ||
1365 | { | ||
1366 | struct ClientMonitorRecord *next; | ||
1367 | |||
1368 | GNUNET_free_non_null (monitor->key); | ||
1369 | next = monitor->next; | ||
1370 | GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, monitor); | ||
1371 | GNUNET_free (monitor); | ||
1372 | monitor = next; | ||
1373 | } | ||
1374 | else | ||
1375 | monitor = monitor->next; | ||
1376 | } | ||
1377 | GNUNET_CONTAINER_multihashmap_iterate (forward_map, &remove_client_records, | ||
1378 | pos); | ||
1379 | GNUNET_free (pos); | ||
1380 | } | ||
1381 | |||
1382 | |||
1383 | |||
1384 | /** | ||
1385 | * Initialize client subsystem. | ||
1386 | * | ||
1387 | * @param server the initialized server | ||
1388 | */ | ||
1389 | void | ||
1390 | GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server) | ||
1391 | { | ||
1392 | static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { | ||
1393 | {&handle_dht_local_put, NULL, | ||
1394 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0}, | ||
1395 | {&handle_dht_local_get, NULL, | ||
1396 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0}, | ||
1397 | {&handle_dht_local_get_stop, NULL, | ||
1398 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, | ||
1399 | sizeof (struct GNUNET_DHT_ClientGetStopMessage)}, | ||
1400 | {&handle_dht_local_monitor, NULL, | ||
1401 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, | ||
1402 | sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, | ||
1403 | {&handle_dht_local_monitor_stop, NULL, | ||
1404 | GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, | ||
1405 | sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, | ||
1406 | {&handle_dht_local_get_result_seen, NULL, | ||
1407 | GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, 0}, | ||
1408 | #if ENABLE_MALICIOUS | ||
1409 | {&handle_dht_act_malicious, NULL, | ||
1410 | GNUNET_MESSAGE_TYPE_DHT_ACT_MALICIOUS, | ||
1411 | sizeof (struct GNUNET_DHT_ActMaliciousMessage)}, | ||
1412 | #endif | ||
1413 | {NULL, NULL, 0, 0} | ||
1414 | }; | ||
1415 | forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO); | ||
1416 | retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | ||
1417 | GNUNET_SERVER_add_handlers (server, plugin_handlers); | ||
1418 | GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); | ||
1419 | } | ||
1420 | |||
1421 | |||
1422 | /** | ||
1423 | * Shutdown client subsystem. | ||
1424 | */ | ||
1425 | void | ||
1426 | GDS_CLIENTS_done () | ||
1427 | { | ||
1428 | GNUNET_assert (client_head == NULL); | ||
1429 | GNUNET_assert (client_tail == NULL); | ||
1430 | if (NULL != retry_task) | ||
1431 | { | ||
1432 | GNUNET_SCHEDULER_cancel (retry_task); | ||
1433 | retry_task = NULL; | ||
1434 | } | ||
1435 | if (NULL != retry_heap) | ||
1436 | { | ||
1437 | GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap)); | ||
1438 | GNUNET_CONTAINER_heap_destroy (retry_heap); | ||
1439 | retry_heap = NULL; | ||
1440 | } | ||
1441 | if (NULL != forward_map) | ||
1442 | { | ||
1443 | GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map)); | ||
1444 | GNUNET_CONTAINER_multihashmap_destroy (forward_map); | ||
1445 | forward_map = NULL; | ||
1446 | } | ||
1447 | } | ||
1448 | |||
1449 | /* end of gnunet-service-dht_clients.c */ | ||