diff options
Diffstat (limited to 'src/psycstore/gnunet-service-psycstore.c')
-rw-r--r-- | src/psycstore/gnunet-service-psycstore.c | 1049 |
1 files changed, 1049 insertions, 0 deletions
diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c new file mode 100644 index 0000000..9aebd3e --- /dev/null +++ b/src/psycstore/gnunet-service-psycstore.c | |||
@@ -0,0 +1,1049 @@ | |||
1 | /** | ||
2 | * This file is part of GNUnet | ||
3 | * Copyright (C) 2013 GNUnet e.V. | ||
4 | * | ||
5 | * GNUnet is free software: you can redistribute it and/or modify it | ||
6 | * under the terms of the GNU Affero General Public License as published | ||
7 | * by the Free Software Foundation, either version 3 of the License, | ||
8 | * or (at your option) any later version. | ||
9 | * | ||
10 | * GNUnet is distributed in the hope that it will be useful, but | ||
11 | * WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | * Affero General Public License for more details. | ||
14 | * | ||
15 | * You should have received a copy of the GNU Affero General Public License | ||
16 | * along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file psycstore/gnunet-service-psycstore.c | ||
23 | * @brief PSYCstore service | ||
24 | * @author Gabor X Toth | ||
25 | * @author Christian Grothoff | ||
26 | */ | ||
27 | |||
28 | #include <inttypes.h> | ||
29 | |||
30 | #include "platform.h" | ||
31 | #include "gnunet_util_lib.h" | ||
32 | #include "gnunet_constants.h" | ||
33 | #include "gnunet_protocols.h" | ||
34 | #include "gnunet_statistics_service.h" | ||
35 | #include "gnunet_psyc_util_lib.h" | ||
36 | #include "gnunet_psycstore_service.h" | ||
37 | #include "gnunet_psycstore_plugin.h" | ||
38 | #include "psycstore.h" | ||
39 | |||
40 | |||
41 | /** | ||
42 | * Handle to our current configuration. | ||
43 | */ | ||
44 | static const struct GNUNET_CONFIGURATION_Handle *cfg; | ||
45 | |||
46 | /** | ||
47 | * Service handle. | ||
48 | */ | ||
49 | static struct GNUNET_SERVICE_Handle *service; | ||
50 | |||
51 | /** | ||
52 | * Handle to the statistics service. | ||
53 | */ | ||
54 | static struct GNUNET_STATISTICS_Handle *stats; | ||
55 | |||
56 | /** | ||
57 | * Database handle | ||
58 | */ | ||
59 | static struct GNUNET_PSYCSTORE_PluginFunctions *db; | ||
60 | |||
61 | /** | ||
62 | * Name of the database plugin | ||
63 | */ | ||
64 | static char *db_lib_name; | ||
65 | |||
66 | |||
67 | /** | ||
68 | * Task run during shutdown. | ||
69 | * | ||
70 | * @param cls unused | ||
71 | */ | ||
72 | static void | ||
73 | shutdown_task (void *cls) | ||
74 | { | ||
75 | if (NULL != stats) | ||
76 | { | ||
77 | GNUNET_STATISTICS_destroy (stats, GNUNET_NO); | ||
78 | stats = NULL; | ||
79 | } | ||
80 | GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name, db)); | ||
81 | GNUNET_free (db_lib_name); | ||
82 | db_lib_name = NULL; | ||
83 | } | ||
84 | |||
85 | |||
86 | /** | ||
87 | * Send a result code back to the client. | ||
88 | * | ||
89 | * @param client | ||
90 | * Client that should receive the result code. | ||
91 | * @param result_code | ||
92 | * Code to transmit. | ||
93 | * @param op_id | ||
94 | * Operation ID in network byte order. | ||
95 | * @param err_msg | ||
96 | * Error message to include (or NULL for none). | ||
97 | */ | ||
98 | static void | ||
99 | send_result_code (struct GNUNET_SERVICE_Client *client, | ||
100 | uint64_t op_id, | ||
101 | int64_t result_code, | ||
102 | const char *err_msg) | ||
103 | { | ||
104 | struct OperationResult *res; | ||
105 | size_t err_size = 0; | ||
106 | |||
107 | if (NULL != err_msg) | ||
108 | err_size = strnlen (err_msg, | ||
109 | GNUNET_MAX_MESSAGE_SIZE - sizeof (*res) - 1) + 1; | ||
110 | struct GNUNET_MQ_Envelope * | ||
111 | env = GNUNET_MQ_msg_extra (res, err_size, | ||
112 | GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE); | ||
113 | res->result_code = GNUNET_htonll (result_code - INT64_MIN); | ||
114 | res->op_id = op_id; | ||
115 | if (0 < err_size) | ||
116 | { | ||
117 | GNUNET_memcpy (&res[1], err_msg, err_size); | ||
118 | ((char *) &res[1])[err_size - 1] = '\0'; | ||
119 | } | ||
120 | |||
121 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
122 | "Sending result to client: %" PRId64 " (%s)\n", | ||
123 | result_code, err_msg); | ||
124 | GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); | ||
125 | } | ||
126 | |||
127 | |||
128 | enum | ||
129 | { | ||
130 | MEMBERSHIP_TEST_NOT_NEEDED = 0, | ||
131 | MEMBERSHIP_TEST_NEEDED = 1, | ||
132 | MEMBERSHIP_TEST_DONE = 2, | ||
133 | } MessageMembershipTest; | ||
134 | |||
135 | |||
136 | struct SendClosure | ||
137 | { | ||
138 | struct GNUNET_SERVICE_Client *client; | ||
139 | |||
140 | /** | ||
141 | * Channel's public key. | ||
142 | */ | ||
143 | struct GNUNET_CRYPTO_EddsaPublicKey channel_key; | ||
144 | |||
145 | /** | ||
146 | * Slave's public key. | ||
147 | */ | ||
148 | struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; | ||
149 | |||
150 | /** | ||
151 | * Operation ID. | ||
152 | */ | ||
153 | uint64_t op_id; | ||
154 | |||
155 | /** | ||
156 | * Membership test result. | ||
157 | */ | ||
158 | int membership_test_result; | ||
159 | |||
160 | /** | ||
161 | * Do membership test with @a slave_key before returning fragment? | ||
162 | * @see enum MessageMembershipTest | ||
163 | */ | ||
164 | uint8_t membership_test; | ||
165 | }; | ||
166 | |||
167 | |||
168 | static int | ||
169 | send_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg, | ||
170 | enum GNUNET_PSYCSTORE_MessageFlags flags) | ||
171 | { | ||
172 | struct SendClosure *sc = cls; | ||
173 | struct FragmentResult *res; | ||
174 | |||
175 | if (MEMBERSHIP_TEST_NEEDED == sc->membership_test) | ||
176 | { | ||
177 | sc->membership_test = MEMBERSHIP_TEST_DONE; | ||
178 | sc->membership_test_result | ||
179 | = db->membership_test (db->cls, &sc->channel_key, &sc->slave_key, | ||
180 | GNUNET_ntohll (msg->message_id)); | ||
181 | switch (sc->membership_test_result) | ||
182 | { | ||
183 | case GNUNET_YES: | ||
184 | break; | ||
185 | |||
186 | case GNUNET_NO: | ||
187 | case GNUNET_SYSERR: | ||
188 | return GNUNET_NO; | ||
189 | } | ||
190 | } | ||
191 | |||
192 | size_t msg_size = ntohs (msg->header.size); | ||
193 | |||
194 | struct GNUNET_MQ_Envelope * | ||
195 | env = GNUNET_MQ_msg_extra (res, msg_size, | ||
196 | GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT); | ||
197 | res->op_id = sc->op_id; | ||
198 | res->psycstore_flags = htonl (flags); | ||
199 | GNUNET_memcpy (&res[1], msg, msg_size); | ||
200 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
201 | "Sending fragment %llu to client\n", | ||
202 | (unsigned long long) GNUNET_ntohll (msg->fragment_id)); | ||
203 | GNUNET_free (msg); | ||
204 | |||
205 | GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (sc->client), env); | ||
206 | return GNUNET_YES; | ||
207 | } | ||
208 | |||
209 | |||
210 | static int | ||
211 | send_state_var (void *cls, const char *name, | ||
212 | const void *value, uint32_t value_size) | ||
213 | { | ||
214 | struct SendClosure *sc = cls; | ||
215 | struct StateResult *res; | ||
216 | size_t name_size = strlen (name) + 1; | ||
217 | |||
218 | /** @todo FIXME: split up value into 64k chunks */ | ||
219 | |||
220 | struct GNUNET_MQ_Envelope * | ||
221 | env = GNUNET_MQ_msg_extra (res, name_size + value_size, | ||
222 | GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE); | ||
223 | res->op_id = sc->op_id; | ||
224 | res->name_size = htons (name_size); | ||
225 | GNUNET_memcpy (&res[1], name, name_size); | ||
226 | GNUNET_memcpy ((char *) &res[1] + name_size, value, value_size); | ||
227 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
228 | "Sending state variable %s to client\n", name); | ||
229 | |||
230 | GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (sc->client), env); | ||
231 | return GNUNET_OK; | ||
232 | } | ||
233 | |||
234 | |||
235 | static void | ||
236 | handle_client_membership_store (void *cls, | ||
237 | const struct MembershipStoreRequest *req) | ||
238 | { | ||
239 | struct GNUNET_SERVICE_Client *client = cls; | ||
240 | |||
241 | int ret = db->membership_store (db->cls, &req->channel_key, &req->slave_key, | ||
242 | req->did_join, | ||
243 | GNUNET_ntohll (req->announced_at), | ||
244 | GNUNET_ntohll (req->effective_since), | ||
245 | GNUNET_ntohll (req->group_generation)); | ||
246 | |||
247 | if (ret != GNUNET_OK) | ||
248 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
249 | _("Failed to store membership information!\n")); | ||
250 | |||
251 | send_result_code (client, req->op_id, ret, NULL); | ||
252 | GNUNET_SERVICE_client_continue (client); | ||
253 | } | ||
254 | |||
255 | |||
256 | static void | ||
257 | handle_client_membership_test (void *cls, | ||
258 | const struct MembershipTestRequest *req) | ||
259 | { | ||
260 | struct GNUNET_SERVICE_Client *client = cls; | ||
261 | |||
262 | int ret = db->membership_test (db->cls, &req->channel_key, &req->slave_key, | ||
263 | GNUNET_ntohll (req->message_id)); | ||
264 | switch (ret) | ||
265 | { | ||
266 | case GNUNET_YES: | ||
267 | case GNUNET_NO: | ||
268 | break; | ||
269 | default: | ||
270 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
271 | _("Failed to test membership!\n")); | ||
272 | } | ||
273 | |||
274 | send_result_code (client, req->op_id, ret, NULL); | ||
275 | GNUNET_SERVICE_client_continue (client); | ||
276 | } | ||
277 | |||
278 | |||
279 | static int | ||
280 | check_client_fragment_store (void *cls, | ||
281 | const struct FragmentStoreRequest *req) | ||
282 | { | ||
283 | return GNUNET_OK; | ||
284 | } | ||
285 | |||
286 | |||
287 | static void | ||
288 | handle_client_fragment_store (void *cls, | ||
289 | const struct FragmentStoreRequest *req) | ||
290 | { | ||
291 | struct GNUNET_SERVICE_Client *client = cls; | ||
292 | |||
293 | const struct GNUNET_MessageHeader * | ||
294 | msg = GNUNET_MQ_extract_nested_mh (req); | ||
295 | if (NULL == msg | ||
296 | || ntohs (msg->size) < sizeof (struct GNUNET_MULTICAST_MessageHeader)) | ||
297 | { | ||
298 | GNUNET_break (0); | ||
299 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
300 | _("Dropping invalid fragment\n")); | ||
301 | GNUNET_SERVICE_client_drop (client); | ||
302 | return; | ||
303 | } | ||
304 | |||
305 | int ret = db->fragment_store (db->cls, &req->channel_key, | ||
306 | (const struct GNUNET_MULTICAST_MessageHeader *) | ||
307 | msg, ntohl (req->psycstore_flags)); | ||
308 | |||
309 | if (ret != GNUNET_OK) | ||
310 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
311 | _("Failed to store fragment\n")); | ||
312 | |||
313 | send_result_code (client, req->op_id, ret, NULL); | ||
314 | GNUNET_SERVICE_client_continue (client); | ||
315 | } | ||
316 | |||
317 | |||
318 | static void | ||
319 | handle_client_fragment_get (void *cls, | ||
320 | const struct FragmentGetRequest *req) | ||
321 | { | ||
322 | struct GNUNET_SERVICE_Client *client = cls; | ||
323 | |||
324 | struct SendClosure | ||
325 | sc = { .op_id = req->op_id, | ||
326 | .client = client, | ||
327 | .channel_key = req->channel_key, | ||
328 | .slave_key = req->slave_key, | ||
329 | .membership_test = req->do_membership_test }; | ||
330 | |||
331 | int64_t ret; | ||
332 | uint64_t ret_frags = 0; | ||
333 | uint64_t first_fragment_id = GNUNET_ntohll (req->first_fragment_id); | ||
334 | uint64_t last_fragment_id = GNUNET_ntohll (req->last_fragment_id); | ||
335 | uint64_t limit = GNUNET_ntohll (req->fragment_limit); | ||
336 | |||
337 | if (0 == limit) | ||
338 | ret = db->fragment_get (db->cls, &req->channel_key, | ||
339 | first_fragment_id, last_fragment_id, | ||
340 | &ret_frags, send_fragment, &sc); | ||
341 | else | ||
342 | ret = db->fragment_get_latest (db->cls, &req->channel_key, limit, | ||
343 | &ret_frags, send_fragment, &sc); | ||
344 | |||
345 | switch (ret) | ||
346 | { | ||
347 | case GNUNET_YES: | ||
348 | case GNUNET_NO: | ||
349 | if (MEMBERSHIP_TEST_DONE == sc.membership_test) | ||
350 | { | ||
351 | switch (sc.membership_test_result) | ||
352 | { | ||
353 | case GNUNET_YES: | ||
354 | break; | ||
355 | |||
356 | case GNUNET_NO: | ||
357 | ret = GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED; | ||
358 | break; | ||
359 | |||
360 | case GNUNET_SYSERR: | ||
361 | ret = GNUNET_SYSERR; | ||
362 | break; | ||
363 | } | ||
364 | } | ||
365 | break; | ||
366 | default: | ||
367 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
368 | _("Failed to get fragment!\n")); | ||
369 | } | ||
370 | send_result_code (client, req->op_id, (ret < 0) ? ret : ret_frags, NULL); | ||
371 | GNUNET_SERVICE_client_continue (client); | ||
372 | } | ||
373 | |||
374 | |||
375 | static int | ||
376 | check_client_message_get (void *cls, | ||
377 | const struct MessageGetRequest *req) | ||
378 | { | ||
379 | return GNUNET_OK; | ||
380 | } | ||
381 | |||
382 | |||
383 | static void | ||
384 | handle_client_message_get (void *cls, | ||
385 | const struct MessageGetRequest *req) | ||
386 | { | ||
387 | struct GNUNET_SERVICE_Client *client = cls; | ||
388 | |||
389 | uint16_t size = ntohs (req->header.size); | ||
390 | const char *method_prefix = (const char *) &req[1]; | ||
391 | |||
392 | if (size < sizeof (*req) + 1 | ||
393 | || '\0' != method_prefix[size - sizeof (*req) - 1]) | ||
394 | { | ||
395 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
396 | "Message get: invalid method prefix. size: %u < %u?\n", | ||
397 | size, | ||
398 | (unsigned int) (sizeof (*req) + 1)); | ||
399 | GNUNET_break (0); | ||
400 | GNUNET_SERVICE_client_drop (client); | ||
401 | return; | ||
402 | } | ||
403 | |||
404 | struct SendClosure | ||
405 | sc = { .op_id = req->op_id, | ||
406 | .client = client, | ||
407 | .channel_key = req->channel_key, | ||
408 | .slave_key = req->slave_key, | ||
409 | .membership_test = req->do_membership_test }; | ||
410 | |||
411 | int64_t ret; | ||
412 | uint64_t ret_frags = 0; | ||
413 | uint64_t first_message_id = GNUNET_ntohll (req->first_message_id); | ||
414 | uint64_t last_message_id = GNUNET_ntohll (req->last_message_id); | ||
415 | uint64_t msg_limit = GNUNET_ntohll (req->message_limit); | ||
416 | uint64_t frag_limit = GNUNET_ntohll (req->fragment_limit); | ||
417 | |||
418 | /** @todo method_prefix */ | ||
419 | if (0 == msg_limit) | ||
420 | ret = db->message_get (db->cls, &req->channel_key, | ||
421 | first_message_id, last_message_id, frag_limit, | ||
422 | &ret_frags, send_fragment, &sc); | ||
423 | else | ||
424 | ret = db->message_get_latest (db->cls, &req->channel_key, msg_limit, | ||
425 | &ret_frags, send_fragment, &sc); | ||
426 | |||
427 | switch (ret) | ||
428 | { | ||
429 | case GNUNET_YES: | ||
430 | case GNUNET_NO: | ||
431 | break; | ||
432 | default: | ||
433 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
434 | _("Failed to get message!\n")); | ||
435 | } | ||
436 | |||
437 | send_result_code (client, req->op_id, (ret < 0) ? ret : ret_frags, NULL); | ||
438 | GNUNET_SERVICE_client_continue (client); | ||
439 | } | ||
440 | |||
441 | |||
442 | static void | ||
443 | handle_client_message_get_fragment (void *cls, | ||
444 | const struct MessageGetFragmentRequest *req) | ||
445 | { | ||
446 | struct GNUNET_SERVICE_Client *client = cls; | ||
447 | |||
448 | struct SendClosure | ||
449 | sc = { .op_id = req->op_id, .client = client, | ||
450 | .channel_key = req->channel_key, .slave_key = req->slave_key, | ||
451 | .membership_test = req->do_membership_test }; | ||
452 | |||
453 | int ret = db->message_get_fragment (db->cls, &req->channel_key, | ||
454 | GNUNET_ntohll (req->message_id), | ||
455 | GNUNET_ntohll (req->fragment_offset), | ||
456 | &send_fragment, &sc); | ||
457 | switch (ret) | ||
458 | { | ||
459 | case GNUNET_YES: | ||
460 | case GNUNET_NO: | ||
461 | break; | ||
462 | default: | ||
463 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
464 | _("Failed to get message fragment!\n")); | ||
465 | } | ||
466 | |||
467 | send_result_code (client, req->op_id, ret, NULL); | ||
468 | GNUNET_SERVICE_client_continue (client); | ||
469 | } | ||
470 | |||
471 | |||
472 | static void | ||
473 | handle_client_counters_get (void *cls, | ||
474 | const struct OperationRequest *req) | ||
475 | { | ||
476 | struct GNUNET_SERVICE_Client *client = cls; | ||
477 | |||
478 | struct CountersResult *res; | ||
479 | struct GNUNET_MQ_Envelope * | ||
480 | env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS); | ||
481 | |||
482 | int ret = db->counters_message_get (db->cls, &req->channel_key, | ||
483 | &res->max_fragment_id, &res->max_message_id, | ||
484 | &res->max_group_generation); | ||
485 | switch (ret) | ||
486 | { | ||
487 | case GNUNET_OK: | ||
488 | ret = db->counters_state_get (db->cls, &req->channel_key, | ||
489 | &res->max_state_message_id); | ||
490 | case GNUNET_NO: | ||
491 | break; | ||
492 | default: | ||
493 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
494 | _("Failed to get master counters!\n")); | ||
495 | } | ||
496 | |||
497 | res->result_code = htonl (ret); | ||
498 | res->op_id = req->op_id; | ||
499 | res->max_fragment_id = GNUNET_htonll (res->max_fragment_id); | ||
500 | res->max_message_id = GNUNET_htonll (res->max_message_id); | ||
501 | res->max_group_generation = GNUNET_htonll (res->max_group_generation); | ||
502 | res->max_state_message_id = GNUNET_htonll (res->max_state_message_id); | ||
503 | |||
504 | GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); | ||
505 | GNUNET_SERVICE_client_continue (client); | ||
506 | } | ||
507 | |||
508 | |||
509 | struct StateModifyClosure | ||
510 | { | ||
511 | const struct GNUNET_CRYPTO_EddsaPublicKey channel_key; | ||
512 | struct GNUNET_PSYC_ReceiveHandle *recv; | ||
513 | enum GNUNET_PSYC_MessageState msg_state; | ||
514 | char mod_oper; | ||
515 | char *mod_name; | ||
516 | char *mod_value; | ||
517 | uint32_t mod_value_size; | ||
518 | uint32_t mod_value_remaining; | ||
519 | }; | ||
520 | |||
521 | |||
522 | static void | ||
523 | recv_state_message_part (void *cls, | ||
524 | const struct GNUNET_PSYC_MessageHeader *msg, | ||
525 | const struct GNUNET_MessageHeader *pmsg) | ||
526 | { | ||
527 | struct StateModifyClosure *scls = cls; | ||
528 | uint16_t psize; | ||
529 | |||
530 | if (NULL == msg) | ||
531 | { // FIXME: error on unknown message | ||
532 | return; | ||
533 | } | ||
534 | |||
535 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
536 | "recv_state_message_part() message_id: %" PRIu64 | ||
537 | ", fragment_offset: %" PRIu64 ", flags: %u\n", | ||
538 | GNUNET_ntohll (msg->message_id), | ||
539 | GNUNET_ntohll (msg->fragment_offset), | ||
540 | ntohl (msg->flags)); | ||
541 | |||
542 | if (NULL == pmsg) | ||
543 | { | ||
544 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; | ||
545 | return; | ||
546 | } | ||
547 | |||
548 | switch (ntohs (pmsg->type)) | ||
549 | { | ||
550 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: | ||
551 | { | ||
552 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD; | ||
553 | break; | ||
554 | } | ||
555 | |||
556 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: | ||
557 | { | ||
558 | struct GNUNET_PSYC_MessageModifier * | ||
559 | pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg; | ||
560 | psize = ntohs (pmod->header.size); | ||
561 | uint16_t name_size = ntohs (pmod->name_size); | ||
562 | uint32_t value_size = ntohl (pmod->value_size); | ||
563 | |||
564 | const char *name = (const char *) &pmod[1]; | ||
565 | const void *value = name + name_size; | ||
566 | |||
567 | if (GNUNET_PSYC_OP_SET != pmod->oper) | ||
568 | { // Apply non-transient operation. | ||
569 | if (psize == sizeof (*pmod) + name_size + value_size) | ||
570 | { | ||
571 | db->state_modify_op (db->cls, &scls->channel_key, | ||
572 | pmod->oper, name, value, value_size); | ||
573 | } | ||
574 | else | ||
575 | { | ||
576 | scls->mod_oper = pmod->oper; | ||
577 | scls->mod_name = GNUNET_malloc (name_size); | ||
578 | GNUNET_memcpy (scls->mod_name, name, name_size); | ||
579 | |||
580 | scls->mod_value_size = value_size; | ||
581 | scls->mod_value = GNUNET_malloc (scls->mod_value_size); | ||
582 | scls->mod_value_remaining | ||
583 | = scls->mod_value_size - (psize - sizeof (*pmod) - name_size); | ||
584 | GNUNET_memcpy (scls->mod_value, value, value_size - scls->mod_value_remaining); | ||
585 | } | ||
586 | } | ||
587 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; | ||
588 | break; | ||
589 | } | ||
590 | |||
591 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: | ||
592 | if (GNUNET_PSYC_OP_SET != scls->mod_oper) | ||
593 | { | ||
594 | if (scls->mod_value_remaining == 0) | ||
595 | { | ||
596 | GNUNET_break_op (0); | ||
597 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; | ||
598 | } | ||
599 | psize = ntohs (pmsg->size); | ||
600 | GNUNET_memcpy (scls->mod_value + (scls->mod_value_size - scls->mod_value_remaining), | ||
601 | &pmsg[1], psize - sizeof (*pmsg)); | ||
602 | scls->mod_value_remaining -= psize - sizeof (*pmsg); | ||
603 | if (0 == scls->mod_value_remaining) | ||
604 | { | ||
605 | db->state_modify_op (db->cls, &scls->channel_key, | ||
606 | scls->mod_oper, scls->mod_name, | ||
607 | scls->mod_value, scls->mod_value_size); | ||
608 | GNUNET_free (scls->mod_name); | ||
609 | GNUNET_free (scls->mod_value); | ||
610 | scls->mod_oper = 0; | ||
611 | scls->mod_name = NULL; | ||
612 | scls->mod_value = NULL; | ||
613 | scls->mod_value_size = 0; | ||
614 | } | ||
615 | } | ||
616 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT; | ||
617 | break; | ||
618 | |||
619 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: | ||
620 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA; | ||
621 | break; | ||
622 | |||
623 | case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: | ||
624 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_END; | ||
625 | break; | ||
626 | |||
627 | default: | ||
628 | scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; | ||
629 | } | ||
630 | } | ||
631 | |||
632 | |||
633 | static int | ||
634 | recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg, | ||
635 | enum GNUNET_PSYCSTORE_MessageFlags flags) | ||
636 | { | ||
637 | struct StateModifyClosure *scls = cls; | ||
638 | |||
639 | if (NULL == scls->recv) | ||
640 | { | ||
641 | scls->recv = GNUNET_PSYC_receive_create (NULL, recv_state_message_part, | ||
642 | scls); | ||
643 | } | ||
644 | |||
645 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
646 | "recv_state_fragment: %" PRIu64 "\n", GNUNET_ntohll (msg->fragment_id)); | ||
647 | |||
648 | struct GNUNET_PSYC_MessageHeader * | ||
649 | pmsg = GNUNET_PSYC_message_header_create (msg, flags); | ||
650 | GNUNET_PSYC_receive_message (scls->recv, pmsg); | ||
651 | GNUNET_free (pmsg); | ||
652 | |||
653 | return GNUNET_YES; | ||
654 | } | ||
655 | |||
656 | |||
657 | static void | ||
658 | handle_client_state_modify (void *cls, | ||
659 | const struct StateModifyRequest *req) | ||
660 | { | ||
661 | struct GNUNET_SERVICE_Client *client = cls; | ||
662 | |||
663 | uint64_t message_id = GNUNET_ntohll (req->message_id); | ||
664 | uint64_t state_delta = GNUNET_ntohll (req->state_delta); | ||
665 | uint64_t ret_frags = 0; | ||
666 | struct StateModifyClosure | ||
667 | scls = { .channel_key = req->channel_key }; | ||
668 | |||
669 | int ret = db->state_modify_begin (db->cls, &req->channel_key, | ||
670 | message_id, state_delta); | ||
671 | |||
672 | if (GNUNET_OK != ret) | ||
673 | { | ||
674 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
675 | _("Failed to begin modifying state: %d\n"), ret); | ||
676 | } | ||
677 | else | ||
678 | { | ||
679 | ret = db->message_get (db->cls, &req->channel_key, | ||
680 | message_id, message_id, 0, | ||
681 | &ret_frags, recv_state_fragment, &scls); | ||
682 | if (GNUNET_OK != ret) | ||
683 | { | ||
684 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
685 | _("Failed to modify state: %d\n"), ret); | ||
686 | GNUNET_break (0); | ||
687 | } | ||
688 | else | ||
689 | { | ||
690 | if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id)) | ||
691 | { | ||
692 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
693 | _("Failed to end modifying state!\n")); | ||
694 | GNUNET_break (0); | ||
695 | } | ||
696 | } | ||
697 | if (NULL != scls.recv) | ||
698 | { | ||
699 | GNUNET_PSYC_receive_destroy (scls.recv); | ||
700 | } | ||
701 | } | ||
702 | |||
703 | send_result_code (client, req->op_id, ret, NULL); | ||
704 | GNUNET_SERVICE_client_continue (client); | ||
705 | } | ||
706 | |||
707 | |||
708 | static int | ||
709 | check_client_state_sync (void *cls, | ||
710 | const struct StateSyncRequest *req) | ||
711 | { | ||
712 | return GNUNET_OK; | ||
713 | } | ||
714 | |||
715 | |||
716 | /** @todo FIXME: stop processing further state sync messages after an error */ | ||
717 | static void | ||
718 | handle_client_state_sync (void *cls, | ||
719 | const struct StateSyncRequest *req) | ||
720 | { | ||
721 | struct GNUNET_SERVICE_Client *client = cls; | ||
722 | |||
723 | int ret = GNUNET_SYSERR; | ||
724 | const char *name = (const char *) &req[1]; | ||
725 | uint16_t name_size = ntohs (req->name_size); | ||
726 | |||
727 | if (name_size <= 2 || '\0' != name[name_size - 1]) | ||
728 | { | ||
729 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
730 | _("Tried to set invalid state variable name!\n")); | ||
731 | GNUNET_break_op (0); | ||
732 | } | ||
733 | else | ||
734 | { | ||
735 | ret = GNUNET_OK; | ||
736 | |||
737 | if (req->flags & STATE_OP_FIRST) | ||
738 | { | ||
739 | ret = db->state_sync_begin (db->cls, &req->channel_key); | ||
740 | } | ||
741 | if (ret != GNUNET_OK) | ||
742 | { | ||
743 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
744 | _("Failed to begin synchronizing state!\n")); | ||
745 | } | ||
746 | else | ||
747 | { | ||
748 | ret = db->state_sync_assign (db->cls, &req->channel_key, name, | ||
749 | name + ntohs (req->name_size), | ||
750 | ntohs (req->header.size) - sizeof (*req) | ||
751 | - ntohs (req->name_size)); | ||
752 | } | ||
753 | |||
754 | if (GNUNET_OK == ret && req->flags & STATE_OP_LAST) | ||
755 | { | ||
756 | ret = db->state_sync_end (db->cls, &req->channel_key, | ||
757 | GNUNET_ntohll (req->max_state_message_id), | ||
758 | GNUNET_ntohll (req->state_hash_message_id)); | ||
759 | if (ret != GNUNET_OK) | ||
760 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
761 | _("Failed to end synchronizing state!\n")); | ||
762 | } | ||
763 | } | ||
764 | send_result_code (client, req->op_id, ret, NULL); | ||
765 | GNUNET_SERVICE_client_continue (client); | ||
766 | } | ||
767 | |||
768 | |||
769 | static void | ||
770 | handle_client_state_reset (void *cls, | ||
771 | const struct OperationRequest *req) | ||
772 | { | ||
773 | struct GNUNET_SERVICE_Client *client = cls; | ||
774 | |||
775 | int ret = db->state_reset (db->cls, &req->channel_key); | ||
776 | |||
777 | if (ret != GNUNET_OK) | ||
778 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
779 | _("Failed to reset state!\n")); | ||
780 | |||
781 | send_result_code (client, req->op_id, ret, NULL); | ||
782 | GNUNET_SERVICE_client_continue (client); | ||
783 | } | ||
784 | |||
785 | |||
786 | static void | ||
787 | handle_client_state_hash_update (void *cls, | ||
788 | const struct StateHashUpdateRequest *req) | ||
789 | { | ||
790 | struct GNUNET_SERVICE_Client *client = cls; | ||
791 | |||
792 | int ret = db->state_reset (db->cls, &req->channel_key); | ||
793 | if (ret != GNUNET_OK) | ||
794 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
795 | _("Failed to reset state!\n")); | ||
796 | |||
797 | send_result_code (client, req->op_id, ret, NULL); | ||
798 | GNUNET_SERVICE_client_continue (client); | ||
799 | } | ||
800 | |||
801 | |||
802 | static int | ||
803 | check_client_state_get (void *cls, | ||
804 | const struct OperationRequest *req) | ||
805 | { | ||
806 | return GNUNET_OK; | ||
807 | } | ||
808 | |||
809 | |||
810 | static void | ||
811 | handle_client_state_get (void *cls, | ||
812 | const struct OperationRequest *req) | ||
813 | { | ||
814 | struct GNUNET_SERVICE_Client *client = cls; | ||
815 | |||
816 | struct SendClosure sc = { .op_id = req->op_id, .client = client }; | ||
817 | int64_t ret = GNUNET_SYSERR; | ||
818 | const char *name = (const char *) &req[1]; | ||
819 | uint16_t name_size = ntohs (req->header.size) - sizeof (*req); | ||
820 | |||
821 | if (name_size <= 2 || '\0' != name[name_size - 1]) | ||
822 | { | ||
823 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
824 | _("Tried to get invalid state variable name!\n")); | ||
825 | GNUNET_break (0); | ||
826 | } | ||
827 | else | ||
828 | { | ||
829 | ret = db->state_get (db->cls, &req->channel_key, name, | ||
830 | &send_state_var, &sc); | ||
831 | if (GNUNET_NO == ret && name_size >= 5) /* min: _a_b\0 */ | ||
832 | { | ||
833 | char *p, *n = GNUNET_malloc (name_size); | ||
834 | GNUNET_memcpy (n, name, name_size); | ||
835 | while (&n[1] < (p = strrchr (n, '_')) && GNUNET_NO == ret) | ||
836 | { | ||
837 | *p = '\0'; | ||
838 | ret = db->state_get (db->cls, &req->channel_key, n, | ||
839 | &send_state_var, &sc); | ||
840 | } | ||
841 | GNUNET_free (n); | ||
842 | } | ||
843 | } | ||
844 | switch (ret) | ||
845 | { | ||
846 | case GNUNET_OK: | ||
847 | case GNUNET_NO: | ||
848 | break; | ||
849 | default: | ||
850 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
851 | _("Failed to get state variable!\n")); | ||
852 | } | ||
853 | |||
854 | send_result_code (client, req->op_id, ret, NULL); | ||
855 | GNUNET_SERVICE_client_continue (client); | ||
856 | } | ||
857 | |||
858 | |||
859 | static int | ||
860 | check_client_state_get_prefix (void *cls, | ||
861 | const struct OperationRequest *req) | ||
862 | { | ||
863 | return GNUNET_OK; | ||
864 | } | ||
865 | |||
866 | |||
867 | static void | ||
868 | handle_client_state_get_prefix (void *cls, | ||
869 | const struct OperationRequest *req) | ||
870 | { | ||
871 | struct GNUNET_SERVICE_Client *client = cls; | ||
872 | |||
873 | struct SendClosure sc = { .op_id = req->op_id, .client = client }; | ||
874 | int64_t ret = GNUNET_SYSERR; | ||
875 | const char *name = (const char *) &req[1]; | ||
876 | uint16_t name_size = ntohs (req->header.size) - sizeof (*req); | ||
877 | |||
878 | if (name_size <= 1 || '\0' != name[name_size - 1]) | ||
879 | { | ||
880 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
881 | _("Tried to get invalid state variable name!\n")); | ||
882 | GNUNET_break (0); | ||
883 | } | ||
884 | else | ||
885 | { | ||
886 | ret = db->state_get_prefix (db->cls, &req->channel_key, name, | ||
887 | &send_state_var, &sc); | ||
888 | } | ||
889 | switch (ret) | ||
890 | { | ||
891 | case GNUNET_OK: | ||
892 | case GNUNET_NO: | ||
893 | break; | ||
894 | default: | ||
895 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
896 | _("Failed to get state variable!\n")); | ||
897 | } | ||
898 | |||
899 | send_result_code (client, req->op_id, ret, NULL); | ||
900 | GNUNET_SERVICE_client_continue (client); | ||
901 | } | ||
902 | |||
903 | |||
904 | /** | ||
905 | * A new client connected. | ||
906 | * | ||
907 | * @param cls NULL | ||
908 | * @param client client to add | ||
909 | * @param mq message queue for @a client | ||
910 | * @return @a client | ||
911 | */ | ||
912 | static void * | ||
913 | client_notify_connect (void *cls, | ||
914 | struct GNUNET_SERVICE_Client *client, | ||
915 | struct GNUNET_MQ_Handle *mq) | ||
916 | { | ||
917 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client); | ||
918 | |||
919 | return client; | ||
920 | } | ||
921 | |||
922 | |||
923 | /** | ||
924 | * Called whenever a client is disconnected. | ||
925 | * Frees our resources associated with that client. | ||
926 | * | ||
927 | * @param cls closure | ||
928 | * @param client identification of the client | ||
929 | * @param app_ctx must match @a client | ||
930 | */ | ||
931 | static void | ||
932 | client_notify_disconnect (void *cls, | ||
933 | struct GNUNET_SERVICE_Client *client, | ||
934 | void *app_ctx) | ||
935 | { | ||
936 | } | ||
937 | |||
938 | |||
939 | /** | ||
940 | * Initialize the PSYCstore service. | ||
941 | * | ||
942 | * @param cls Closure. | ||
943 | * @param server The initialized server. | ||
944 | * @param c Configuration to use. | ||
945 | */ | ||
946 | static void | ||
947 | run (void *cls, | ||
948 | const struct GNUNET_CONFIGURATION_Handle *c, | ||
949 | struct GNUNET_SERVICE_Handle *svc) | ||
950 | { | ||
951 | cfg = c; | ||
952 | service = svc; | ||
953 | |||
954 | /* Loading database plugin */ | ||
955 | char *database; | ||
956 | if (GNUNET_OK != | ||
957 | GNUNET_CONFIGURATION_get_value_string (cfg, "psycstore", "database", | ||
958 | &database)) | ||
959 | { | ||
960 | GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, | ||
961 | "psycstore", | ||
962 | "database"); | ||
963 | } | ||
964 | else | ||
965 | { | ||
966 | GNUNET_asprintf (&db_lib_name, | ||
967 | "libgnunet_plugin_psycstore_%s", | ||
968 | database); | ||
969 | db = GNUNET_PLUGIN_load (db_lib_name, (void *) cfg); | ||
970 | GNUNET_free (database); | ||
971 | } | ||
972 | if (NULL == db) | ||
973 | { | ||
974 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
975 | "Could not load database backend `%s'\n", | ||
976 | db_lib_name); | ||
977 | GNUNET_SCHEDULER_add_now (&shutdown_task, NULL); | ||
978 | return; | ||
979 | } | ||
980 | |||
981 | stats = GNUNET_STATISTICS_create ("psycstore", cfg); | ||
982 | GNUNET_SCHEDULER_add_shutdown (shutdown_task, | ||
983 | NULL); | ||
984 | } | ||
985 | |||
986 | /** | ||
987 | * Define "main" method using service macro. | ||
988 | */ | ||
989 | GNUNET_SERVICE_MAIN | ||
990 | ("psycstore", | ||
991 | GNUNET_SERVICE_OPTION_NONE, | ||
992 | run, | ||
993 | client_notify_connect, | ||
994 | client_notify_disconnect, | ||
995 | NULL, | ||
996 | GNUNET_MQ_hd_fixed_size (client_membership_store, | ||
997 | GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE, | ||
998 | struct MembershipStoreRequest, | ||
999 | NULL), | ||
1000 | GNUNET_MQ_hd_fixed_size (client_membership_test, | ||
1001 | GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST, | ||
1002 | struct MembershipTestRequest, | ||
1003 | NULL), | ||
1004 | GNUNET_MQ_hd_var_size (client_fragment_store, | ||
1005 | GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE, | ||
1006 | struct FragmentStoreRequest, | ||
1007 | NULL), | ||
1008 | GNUNET_MQ_hd_fixed_size (client_fragment_get, | ||
1009 | GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET, | ||
1010 | struct FragmentGetRequest, | ||
1011 | NULL), | ||
1012 | GNUNET_MQ_hd_var_size (client_message_get, | ||
1013 | GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET, | ||
1014 | struct MessageGetRequest, | ||
1015 | NULL), | ||
1016 | GNUNET_MQ_hd_fixed_size (client_message_get_fragment, | ||
1017 | GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT, | ||
1018 | struct MessageGetFragmentRequest, | ||
1019 | NULL), | ||
1020 | GNUNET_MQ_hd_fixed_size (client_counters_get, | ||
1021 | GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET, | ||
1022 | struct OperationRequest, | ||
1023 | NULL), | ||
1024 | GNUNET_MQ_hd_fixed_size (client_state_modify, | ||
1025 | GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY, | ||
1026 | struct StateModifyRequest, | ||
1027 | NULL), | ||
1028 | GNUNET_MQ_hd_var_size (client_state_sync, | ||
1029 | GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC, | ||
1030 | struct StateSyncRequest, | ||
1031 | NULL), | ||
1032 | GNUNET_MQ_hd_fixed_size (client_state_reset, | ||
1033 | GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET, | ||
1034 | struct OperationRequest, | ||
1035 | NULL), | ||
1036 | GNUNET_MQ_hd_fixed_size (client_state_hash_update, | ||
1037 | GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE, | ||
1038 | struct StateHashUpdateRequest, | ||
1039 | NULL), | ||
1040 | GNUNET_MQ_hd_var_size (client_state_get, | ||
1041 | GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET, | ||
1042 | struct OperationRequest, | ||
1043 | NULL), | ||
1044 | GNUNET_MQ_hd_var_size (client_state_get_prefix, | ||
1045 | GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX, | ||
1046 | struct OperationRequest, | ||
1047 | NULL)); | ||
1048 | |||
1049 | /* end of gnunet-service-psycstore.c */ | ||