aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-06-04 11:57:59 +0200
committerChristian Grothoff <christian@grothoff.org>2019-06-04 11:57:59 +0200
commit58002acac13b2eef407a20ee3ddc5f458cd5e483 (patch)
tree8285fc1b32a8c9ac55f970667e7d29b791b6758e /src/peerstore
parent9ce956ea4c93f038995a21c6c1c0133eee6bff75 (diff)
downloadgnunet-58002acac13b2eef407a20ee3ddc5f458cd5e483.tar.gz
gnunet-58002acac13b2eef407a20ee3ddc5f458cd5e483.zip
nicer loop structure
Diffstat (limited to 'src/peerstore')
-rw-r--r--src/peerstore/gnunet-service-peerstore.c264
-rw-r--r--src/peerstore/peerstore_api.c247
2 files changed, 215 insertions, 296 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c
index 9bce542b9..ed1c35535 100644
--- a/src/peerstore/gnunet-service-peerstore.c
+++ b/src/peerstore/gnunet-service-peerstore.c
@@ -11,7 +11,7 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
@@ -33,7 +33,7 @@
33/** 33/**
34 * Interval for expired records cleanup (in seconds) 34 * Interval for expired records cleanup (in seconds)
35 */ 35 */
36#define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */ 36#define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */
37 37
38/** 38/**
39 * Our configuration. 39 * Our configuration.
@@ -79,9 +79,7 @@ do_shutdown ()
79{ 79{
80 if (NULL != db_lib_name) 80 if (NULL != db_lib_name)
81 { 81 {
82 GNUNET_break (NULL == 82 GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name, db));
83 GNUNET_PLUGIN_unload (db_lib_name,
84 db));
85 GNUNET_free (db_lib_name); 83 GNUNET_free (db_lib_name);
86 db_lib_name = NULL; 84 db_lib_name = NULL;
87 } 85 }
@@ -108,15 +106,14 @@ static void
108shutdown_task (void *cls) 106shutdown_task (void *cls)
109{ 107{
110 in_shutdown = GNUNET_YES; 108 in_shutdown = GNUNET_YES;
111 if (0 == num_clients) /* Only when no connected clients. */ 109 if (0 == num_clients) /* Only when no connected clients. */
112 do_shutdown (); 110 do_shutdown ();
113} 111}
114 112
115 113
116/* Forward declaration */ 114/* Forward declaration */
117static void 115static void
118expire_records_continuation (void *cls, 116expire_records_continuation (void *cls, int success);
119 int success);
120 117
121 118
122/** 119/**
@@ -131,16 +128,16 @@ cleanup_expired_records (void *cls)
131 GNUNET_assert (NULL != db); 128 GNUNET_assert (NULL != db);
132 ret = db->expire_records (db->cls, 129 ret = db->expire_records (db->cls,
133 GNUNET_TIME_absolute_get (), 130 GNUNET_TIME_absolute_get (),
134 &expire_records_continuation, 131 &expire_records_continuation,
135 NULL); 132 NULL);
136 if (GNUNET_OK != ret) 133 if (GNUNET_OK != ret)
137 { 134 {
138 GNUNET_assert (NULL == expire_task); 135 GNUNET_assert (NULL == expire_task);
139 expire_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 136 expire_task = GNUNET_SCHEDULER_add_delayed (
140 (GNUNET_TIME_UNIT_SECONDS, 137 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
141 EXPIRED_RECORDS_CLEANUP_INTERVAL), 138 EXPIRED_RECORDS_CLEANUP_INTERVAL),
142 &cleanup_expired_records, 139 &cleanup_expired_records,
143 NULL); 140 NULL);
144 } 141 }
145} 142}
146 143
@@ -152,19 +149,16 @@ cleanup_expired_records (void *cls)
152 * @param success count of records deleted or #GNUNET_SYSERR 149 * @param success count of records deleted or #GNUNET_SYSERR
153 */ 150 */
154static void 151static void
155expire_records_continuation (void *cls, 152expire_records_continuation (void *cls, int success)
156 int success)
157{ 153{
158 if (success > 0) 154 if (success > 0)
159 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 155 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", success);
160 "%d records expired.\n",
161 success);
162 GNUNET_assert (NULL == expire_task); 156 GNUNET_assert (NULL == expire_task);
163 expire_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 157 expire_task = GNUNET_SCHEDULER_add_delayed (
164 (GNUNET_TIME_UNIT_SECONDS, 158 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
165 EXPIRED_RECORDS_CLEANUP_INTERVAL), 159 EXPIRED_RECORDS_CLEANUP_INTERVAL),
166 &cleanup_expired_records, 160 &cleanup_expired_records,
167 NULL); 161 NULL);
168} 162}
169 163
170 164
@@ -195,15 +189,12 @@ client_connect_cb (void *cls,
195 * @return #GNUNET_OK to continue iterating 189 * @return #GNUNET_OK to continue iterating
196 */ 190 */
197static int 191static int
198client_disconnect_it (void *cls, 192client_disconnect_it (void *cls, const struct GNUNET_HashCode *key, void *value)
199 const struct GNUNET_HashCode *key,
200 void *value)
201{ 193{
202 if (value == cls) 194 if (value == cls)
203 { 195 {
204 GNUNET_CONTAINER_multihashmap_remove (watchers, 196 GNUNET_assert (GNUNET_YES ==
205 key, 197 GNUNET_CONTAINER_multihashmap_remove (watchers, key, value));
206 value);
207 num_clients++; 198 num_clients++;
208 } 199 }
209 return GNUNET_OK; 200 return GNUNET_OK;
@@ -221,15 +212,13 @@ client_disconnect_cb (void *cls,
221 struct GNUNET_SERVICE_Client *client, 212 struct GNUNET_SERVICE_Client *client,
222 void *app_cls) 213 void *app_cls)
223{ 214{
224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 215 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "A client disconnected, cleaning up.\n");
225 "A client disconnected, cleaning up.\n");
226 if (NULL != watchers) 216 if (NULL != watchers)
227 GNUNET_CONTAINER_multihashmap_iterate (watchers, 217 GNUNET_CONTAINER_multihashmap_iterate (watchers,
228 &client_disconnect_it, 218 &client_disconnect_it,
229 client); 219 client);
230 num_clients--; 220 num_clients--;
231 if ( (0 == num_clients) && 221 if ((0 == num_clients) && in_shutdown)
232 in_shutdown)
233 do_shutdown (); 222 do_shutdown ();
234} 223}
235 224
@@ -255,10 +244,8 @@ record_iterator (void *cls,
255 /* No more records */ 244 /* No more records */
256 struct GNUNET_MessageHeader *endmsg; 245 struct GNUNET_MessageHeader *endmsg;
257 246
258 env = GNUNET_MQ_msg (endmsg, 247 env = GNUNET_MQ_msg (endmsg, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
259 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END); 248 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client), env);
260 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client),
261 env);
262 if (NULL == emsg) 249 if (NULL == emsg)
263 { 250 {
264 GNUNET_SERVICE_client_continue (cls_record->client); 251 GNUNET_SERVICE_client_continue (cls_record->client);
@@ -266,25 +253,23 @@ record_iterator (void *cls,
266 else 253 else
267 { 254 {
268 GNUNET_break (0); 255 GNUNET_break (0);
269 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 256 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to iterate: %s\n", emsg);
270 "Failed to iterate: %s\n",
271 emsg);
272 GNUNET_SERVICE_client_drop (cls_record->client); 257 GNUNET_SERVICE_client_drop (cls_record->client);
273 } 258 }
274 PEERSTORE_destroy_record (cls_record); 259 PEERSTORE_destroy_record (cls_record);
275 return; 260 return;
276 } 261 }
277 262
278 env = PEERSTORE_create_record_mq_envelope (record->sub_system, 263 env = PEERSTORE_create_record_mq_envelope (
279 &record->peer, 264 record->sub_system,
280 record->key, 265 &record->peer,
281 record->value, 266 record->key,
282 record->value_size, 267 record->value,
283 record->expiry, 268 record->value_size,
284 0, 269 record->expiry,
285 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD); 270 0,
286 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client), 271 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
287 env); 272 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client), env);
288} 273}
289 274
290 275
@@ -298,26 +283,23 @@ record_iterator (void *cls,
298 * @return #GNUNET_YES to continue iterating 283 * @return #GNUNET_YES to continue iterating
299 */ 284 */
300static int 285static int
301watch_notifier_it (void *cls, 286watch_notifier_it (void *cls, const struct GNUNET_HashCode *key, void *value)
302 const struct GNUNET_HashCode *key,
303 void *value)
304{ 287{
305 struct GNUNET_PEERSTORE_Record *record = cls; 288 struct GNUNET_PEERSTORE_Record *record = cls;
306 struct GNUNET_SERVICE_Client *client = value; 289 struct GNUNET_SERVICE_Client *client = value;
307 struct GNUNET_MQ_Envelope *env; 290 struct GNUNET_MQ_Envelope *env;
308 291
309 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 292 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
310 "Found a watcher to update.\n"); 293 env = PEERSTORE_create_record_mq_envelope (
311 env = PEERSTORE_create_record_mq_envelope (record->sub_system, 294 record->sub_system,
312 &record->peer, 295 &record->peer,
313 record->key, 296 record->key,
314 record->value, 297 record->value,
315 record->value_size, 298 record->value_size,
316 record->expiry, 299 record->expiry,
317 0, 300 0,
318 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD); 301 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
319 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), 302 GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
320 env);
321 return GNUNET_YES; 303 return GNUNET_YES;
322} 304}
323 305
@@ -332,10 +314,7 @@ watch_notifier (struct GNUNET_PEERSTORE_Record *record)
332{ 314{
333 struct GNUNET_HashCode keyhash; 315 struct GNUNET_HashCode keyhash;
334 316
335 PEERSTORE_hash_key (record->sub_system, 317 PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash);
336 &record->peer,
337 record->key,
338 &keyhash);
339 GNUNET_CONTAINER_multihashmap_get_multiple (watchers, 318 GNUNET_CONTAINER_multihashmap_get_multiple (watchers,
340 &keyhash, 319 &keyhash,
341 &watch_notifier_it, 320 &watch_notifier_it,
@@ -350,17 +329,13 @@ watch_notifier (struct GNUNET_PEERSTORE_Record *record)
350 * @param hm the actual message 329 * @param hm the actual message
351 */ 330 */
352static void 331static void
353handle_watch_cancel (void *cls, 332handle_watch_cancel (void *cls, const struct StoreKeyHashMessage *hm)
354 const struct StoreKeyHashMessage *hm)
355{ 333{
356 struct GNUNET_SERVICE_Client *client = cls; 334 struct GNUNET_SERVICE_Client *client = cls;
357 335
358 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request.\n");
359 "Received a watch cancel request.\n");
360 if (GNUNET_OK != 337 if (GNUNET_OK !=
361 GNUNET_CONTAINER_multihashmap_remove (watchers, 338 GNUNET_CONTAINER_multihashmap_remove (watchers, &hm->keyhash, client))
362 &hm->keyhash,
363 client))
364 { 339 {
365 GNUNET_break (0); 340 GNUNET_break (0);
366 GNUNET_SERVICE_client_drop (client); 341 GNUNET_SERVICE_client_drop (client);
@@ -378,13 +353,11 @@ handle_watch_cancel (void *cls,
378 * @param hm the actual message 353 * @param hm the actual message
379 */ 354 */
380static void 355static void
381handle_watch (void *cls, 356handle_watch (void *cls, const struct StoreKeyHashMessage *hm)
382 const struct StoreKeyHashMessage *hm)
383{ 357{
384 struct GNUNET_SERVICE_Client *client = cls; 358 struct GNUNET_SERVICE_Client *client = cls;
385 359
386 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 360 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received a watch request.\n");
387 "Received a watch request.\n");
388 num_clients--; /* do not count watchers */ 361 num_clients--; /* do not count watchers */
389 GNUNET_SERVICE_client_mark_monitor (client); 362 GNUNET_SERVICE_client_mark_monitor (client);
390 GNUNET_CONTAINER_multihashmap_put (watchers, 363 GNUNET_CONTAINER_multihashmap_put (watchers,
@@ -403,8 +376,7 @@ handle_watch (void *cls,
403 * @return #GNUNET_OK if @a srm is well-formed 376 * @return #GNUNET_OK if @a srm is well-formed
404 */ 377 */
405static int 378static int
406check_iterate (void *cls, 379check_iterate (void *cls, const struct StoreRecordMessage *srm)
407 const struct StoreRecordMessage *srm)
408{ 380{
409 struct GNUNET_PEERSTORE_Record *record; 381 struct GNUNET_PEERSTORE_Record *record;
410 382
@@ -432,8 +404,7 @@ check_iterate (void *cls,
432 * @param srm the actual message 404 * @param srm the actual message
433 */ 405 */
434static void 406static void
435handle_iterate (void *cls, 407handle_iterate (void *cls, const struct StoreRecordMessage *srm)
436 const struct StoreRecordMessage *srm)
437{ 408{
438 struct GNUNET_SERVICE_Client *client = cls; 409 struct GNUNET_SERVICE_Client *client = cls;
439 struct GNUNET_PEERSTORE_Record *record; 410 struct GNUNET_PEERSTORE_Record *record;
@@ -467,8 +438,7 @@ handle_iterate (void *cls,
467 * @param success result 438 * @param success result
468 */ 439 */
469static void 440static void
470store_record_continuation (void *cls, 441store_record_continuation (void *cls, int success)
471 int success)
472{ 442{
473 struct GNUNET_PEERSTORE_Record *record = cls; 443 struct GNUNET_PEERSTORE_Record *record = cls;
474 444
@@ -494,8 +464,7 @@ store_record_continuation (void *cls,
494 * @return #GNUNET_OK if @a srm is well-formed 464 * @return #GNUNET_OK if @a srm is well-formed
495 */ 465 */
496static int 466static int
497check_store (void *cls, 467check_store (void *cls, const struct StoreRecordMessage *srm)
498 const struct StoreRecordMessage *srm)
499{ 468{
500 struct GNUNET_PEERSTORE_Record *record; 469 struct GNUNET_PEERSTORE_Record *record;
501 470
@@ -505,8 +474,7 @@ check_store (void *cls,
505 GNUNET_break (0); 474 GNUNET_break (0);
506 return GNUNET_SYSERR; 475 return GNUNET_SYSERR;
507 } 476 }
508 if ( (NULL == record->sub_system) || 477 if ((NULL == record->sub_system) || (NULL == record->key))
509 (NULL == record->key) )
510 { 478 {
511 GNUNET_break (0); 479 GNUNET_break (0);
512 PEERSTORE_destroy_record (record); 480 PEERSTORE_destroy_record (record);
@@ -524,31 +492,30 @@ check_store (void *cls,
524 * @param srm the actual message 492 * @param srm the actual message
525 */ 493 */
526static void 494static void
527handle_store (void *cls, 495handle_store (void *cls, const struct StoreRecordMessage *srm)
528 const struct StoreRecordMessage *srm)
529{ 496{
530 struct GNUNET_SERVICE_Client *client = cls; 497 struct GNUNET_SERVICE_Client *client = cls;
531 struct GNUNET_PEERSTORE_Record *record; 498 struct GNUNET_PEERSTORE_Record *record;
532 499
533 record = PEERSTORE_parse_record_message (srm); 500 record = PEERSTORE_parse_record_message (srm);
534 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 501 GNUNET_log (
535 "Received a store request. Sub system `%s' Peer `%s Key `%s' Options: %u.\n", 502 GNUNET_ERROR_TYPE_INFO,
536 record->sub_system, 503 "Received a store request. Sub system `%s' Peer `%s Key `%s' Options: %u.\n",
537 GNUNET_i2s (&record->peer), 504 record->sub_system,
538 record->key, 505 GNUNET_i2s (&record->peer),
539 (uint32_t) ntohl (srm->options)); 506 record->key,
507 (uint32_t) ntohl (srm->options));
540 record->client = client; 508 record->client = client;
541 if (GNUNET_OK != 509 if (GNUNET_OK != db->store_record (db->cls,
542 db->store_record (db->cls, 510 record->sub_system,
543 record->sub_system, 511 &record->peer,
544 &record->peer, 512 record->key,
545 record->key, 513 record->value,
546 record->value, 514 record->value_size,
547 record->value_size, 515 record->expiry,
548 record->expiry, 516 ntohl (srm->options),
549 ntohl (srm->options), 517 &store_record_continuation,
550 &store_record_continuation, 518 record))
551 record))
552 { 519 {
553 GNUNET_break (0); 520 GNUNET_break (0);
554 PEERSTORE_destroy_record (record); 521 PEERSTORE_destroy_record (record);
@@ -574,11 +541,10 @@ run (void *cls,
574 541
575 in_shutdown = GNUNET_NO; 542 in_shutdown = GNUNET_NO;
576 cfg = c; 543 cfg = c;
577 if (GNUNET_OK != 544 if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg,
578 GNUNET_CONFIGURATION_get_value_string (cfg, 545 "peerstore",
579 "peerstore", 546 "DATABASE",
580 "DATABASE", 547 &database))
581 &database))
582 { 548 {
583 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, 549 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
584 "peerstore", 550 "peerstore",
@@ -586,56 +552,50 @@ run (void *cls,
586 GNUNET_SCHEDULER_shutdown (); 552 GNUNET_SCHEDULER_shutdown ();
587 return; 553 return;
588 } 554 }
589 GNUNET_asprintf (&db_lib_name, 555 GNUNET_asprintf (&db_lib_name, "libgnunet_plugin_peerstore_%s", database);
590 "libgnunet_plugin_peerstore_%s", 556 db = GNUNET_PLUGIN_load (db_lib_name, (void *) cfg);
591 database);
592 db = GNUNET_PLUGIN_load (db_lib_name,
593 (void *) cfg);
594 GNUNET_free (database); 557 GNUNET_free (database);
595 if (NULL == db) 558 if (NULL == db)
596 { 559 {
597 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 560 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
598 _("Could not load database backend `%s'\n"), 561 _ ("Could not load database backend `%s'\n"),
599 db_lib_name); 562 db_lib_name);
600 GNUNET_SCHEDULER_shutdown (); 563 GNUNET_SCHEDULER_shutdown ();
601 return; 564 return;
602 } 565 }
603 watchers = GNUNET_CONTAINER_multihashmap_create (10, 566 watchers = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
604 GNUNET_NO); 567 expire_task = GNUNET_SCHEDULER_add_now (&cleanup_expired_records, NULL);
605 expire_task = GNUNET_SCHEDULER_add_now (&cleanup_expired_records, 568 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
606 NULL);
607 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
608 NULL);
609} 569}
610 570
611 571
612/** 572/**
613 * Define "main" method using service macro. 573 * Define "main" method using service macro.
614 */ 574 */
615GNUNET_SERVICE_MAIN 575GNUNET_SERVICE_MAIN (
616("peerstore", 576 "peerstore",
617 GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN, 577 GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
618 &run, 578 &run,
619 &client_connect_cb, 579 &client_connect_cb,
620 &client_disconnect_cb, 580 &client_disconnect_cb,
621 NULL, 581 NULL,
622 GNUNET_MQ_hd_var_size (store, 582 GNUNET_MQ_hd_var_size (store,
623 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, 583 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE,
624 struct StoreRecordMessage, 584 struct StoreRecordMessage,
625 NULL), 585 NULL),
626 GNUNET_MQ_hd_var_size (iterate, 586 GNUNET_MQ_hd_var_size (iterate,
627 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 587 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE,
628 struct StoreRecordMessage, 588 struct StoreRecordMessage,
629 NULL), 589 NULL),
630 GNUNET_MQ_hd_fixed_size (watch, 590 GNUNET_MQ_hd_fixed_size (watch,
631 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH, 591 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH,
632 struct StoreKeyHashMessage, 592 struct StoreKeyHashMessage,
633 NULL), 593 NULL),
634 GNUNET_MQ_hd_fixed_size (watch_cancel, 594 GNUNET_MQ_hd_fixed_size (watch_cancel,
635 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL, 595 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL,
636 struct StoreKeyHashMessage, 596 struct StoreKeyHashMessage,
637 NULL), 597 NULL),
638 GNUNET_MQ_handler_end ()); 598 GNUNET_MQ_handler_end ());
639 599
640 600
641/* end of gnunet-service-peerstore.c */ 601/* end of gnunet-service-peerstore.c */
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
index 02f3e287a..64bc3ae72 100644
--- a/src/peerstore/peerstore_api.c
+++ b/src/peerstore/peerstore_api.c
@@ -28,7 +28,7 @@
28#include "peerstore.h" 28#include "peerstore.h"
29#include "peerstore_common.h" 29#include "peerstore_common.h"
30 30
31#define LOG(kind,...) GNUNET_log_from (kind, "peerstore-api",__VA_ARGS__) 31#define LOG(kind, ...) GNUNET_log_from (kind, "peerstore-api", __VA_ARGS__)
32 32
33/******************************************************************************/ 33/******************************************************************************/
34/************************ DATA STRUCTURES ****************************/ 34/************************ DATA STRUCTURES ****************************/
@@ -89,7 +89,6 @@ struct GNUNET_PEERSTORE_Handle
89 * Are we in the process of disconnecting but need to sync first? 89 * Are we in the process of disconnecting but need to sync first?
90 */ 90 */
91 int disconnecting; 91 int disconnecting;
92
93}; 92};
94 93
95/** 94/**
@@ -156,7 +155,6 @@ struct GNUNET_PEERSTORE_StoreContext
156 * Options for the store operation. 155 * Options for the store operation.
157 */ 156 */
158 enum GNUNET_PEERSTORE_StoreOption options; 157 enum GNUNET_PEERSTORE_StoreOption options;
159
160}; 158};
161 159
162/** 160/**
@@ -208,7 +206,6 @@ struct GNUNET_PEERSTORE_IterateContext
208 * #GNUNET_YES if we are currently processing records. 206 * #GNUNET_YES if we are currently processing records.
209 */ 207 */
210 int iterating; 208 int iterating;
211
212}; 209};
213 210
214/** 211/**
@@ -245,7 +242,6 @@ struct GNUNET_PEERSTORE_WatchContext
245 * Hash of the combined key 242 * Hash of the combined key
246 */ 243 */
247 struct GNUNET_HashCode keyhash; 244 struct GNUNET_HashCode keyhash;
248
249}; 245};
250 246
251/******************************************************************************/ 247/******************************************************************************/
@@ -271,8 +267,7 @@ disconnect (struct GNUNET_PEERSTORE_Handle *h)
271{ 267{
272 struct GNUNET_PEERSTORE_IterateContext *next; 268 struct GNUNET_PEERSTORE_IterateContext *next;
273 269
274 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; 270 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
275 NULL != ic;
276 ic = next) 271 ic = next)
277 { 272 {
278 next = ic->next; 273 next = ic->next;
@@ -285,9 +280,7 @@ disconnect (struct GNUNET_PEERSTORE_Handle *h)
285 icb_cls = ic->callback_cls; 280 icb_cls = ic->callback_cls;
286 GNUNET_PEERSTORE_iterate_cancel (ic); 281 GNUNET_PEERSTORE_iterate_cancel (ic);
287 if (NULL != icb) 282 if (NULL != icb)
288 icb (icb_cls, 283 icb (icb_cls, NULL, "Iteration canceled due to reconnection");
289 NULL,
290 "Iteration canceled due to reconnection");
291 } 284 }
292 } 285 }
293 286
@@ -312,17 +305,13 @@ disconnect_and_schedule_reconnect (struct GNUNET_PEERSTORE_Handle *h)
312 disconnect (h); 305 disconnect (h);
313 LOG (GNUNET_ERROR_TYPE_DEBUG, 306 LOG (GNUNET_ERROR_TYPE_DEBUG,
314 "Scheduling task to reconnect to PEERSTORE service in %s.\n", 307 "Scheduling task to reconnect to PEERSTORE service in %s.\n",
315 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, 308 GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
316 GNUNET_YES));
317 h->reconnect_task = 309 h->reconnect_task =
318 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, 310 GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
319 &reconnect,
320 h);
321 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); 311 h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
322} 312}
323 313
324 314
325
326/** 315/**
327 * Callback after MQ envelope is sent 316 * Callback after MQ envelope is sent
328 * 317 *
@@ -352,8 +341,7 @@ store_request_sent (void *cls)
352 * Function called when we had trouble talking to the service. 341 * Function called when we had trouble talking to the service.
353 */ 342 */
354static void 343static void
355handle_client_error (void *cls, 344handle_client_error (void *cls, enum GNUNET_MQ_Error error)
356 enum GNUNET_MQ_Error error)
357{ 345{
358 struct GNUNET_PEERSTORE_Handle *h = cls; 346 struct GNUNET_PEERSTORE_Handle *h = cls;
359 347
@@ -373,9 +361,7 @@ handle_client_error (void *cls,
373 * @return #GNUNET_YES (continue to iterate) 361 * @return #GNUNET_YES (continue to iterate)
374 */ 362 */
375static int 363static int
376rewatch_it (void *cls, 364rewatch_it (void *cls, const struct GNUNET_HashCode *key, void *value)
377 const struct GNUNET_HashCode *key,
378 void *value)
379{ 365{
380 struct GNUNET_PEERSTORE_Handle *h = cls; 366 struct GNUNET_PEERSTORE_Handle *h = cls;
381 struct GNUNET_PEERSTORE_WatchContext *wc = value; 367 struct GNUNET_PEERSTORE_WatchContext *wc = value;
@@ -398,9 +384,7 @@ rewatch_it (void *cls,
398 * @return #GNUNET_YES to continue iteration 384 * @return #GNUNET_YES to continue iteration
399 */ 385 */
400static int 386static int
401destroy_watch (void *cls, 387destroy_watch (void *cls, const struct GNUNET_HashCode *key, void *value)
402 const struct GNUNET_HashCode *key,
403 void *value)
404{ 388{
405 struct GNUNET_PEERSTORE_WatchContext *wc = value; 389 struct GNUNET_PEERSTORE_WatchContext *wc = value;
406 390
@@ -461,8 +445,7 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
461 * @param sync_first send any pending STORE requests before disconnecting 445 * @param sync_first send any pending STORE requests before disconnecting
462 */ 446 */
463void 447void
464GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, 448GNUNET_PEERSTORE_disconnect (struct GNUNET_PEERSTORE_Handle *h, int sync_first)
465 int sync_first)
466{ 449{
467 struct GNUNET_PEERSTORE_IterateContext *ic; 450 struct GNUNET_PEERSTORE_IterateContext *ic;
468 struct GNUNET_PEERSTORE_StoreContext *sc; 451 struct GNUNET_PEERSTORE_StoreContext *sc;
@@ -515,8 +498,7 @@ GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
515 GNUNET_free (sc->value); 498 GNUNET_free (sc->value);
516 GNUNET_free (sc->key); 499 GNUNET_free (sc->key);
517 GNUNET_free (sc); 500 GNUNET_free (sc);
518 if ( (GNUNET_YES == h->disconnecting) && 501 if ((GNUNET_YES == h->disconnecting) && (NULL == h->store_head))
519 (NULL == h->store_head) )
520 final_disconnect (h); 502 final_disconnect (h);
521} 503}
522 504
@@ -542,7 +524,8 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
542 const char *sub_system, 524 const char *sub_system,
543 const struct GNUNET_PeerIdentity *peer, 525 const struct GNUNET_PeerIdentity *peer,
544 const char *key, 526 const char *key,
545 const void *value, size_t size, 527 const void *value,
528 size_t size,
546 struct GNUNET_TIME_Absolute expiry, 529 struct GNUNET_TIME_Absolute expiry,
547 enum GNUNET_PEERSTORE_StoreOption options, 530 enum GNUNET_PEERSTORE_StoreOption options,
548 GNUNET_PEERSTORE_Continuation cont, 531 GNUNET_PEERSTORE_Continuation cont,
@@ -553,10 +536,19 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
553 536
554 LOG (GNUNET_ERROR_TYPE_DEBUG, 537 LOG (GNUNET_ERROR_TYPE_DEBUG,
555 "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n", 538 "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
556 size, sub_system, GNUNET_i2s (peer), key); 539 size,
557 ev = PEERSTORE_create_record_mq_envelope (sub_system, peer, key, value, size, 540 sub_system,
558 expiry, options, 541 GNUNET_i2s (peer),
559 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); 542 key);
543 ev =
544 PEERSTORE_create_record_mq_envelope (sub_system,
545 peer,
546 key,
547 value,
548 size,
549 expiry,
550 options,
551 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
560 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext); 552 sc = GNUNET_new (struct GNUNET_PEERSTORE_StoreContext);
561 553
562 sc->sub_system = GNUNET_strdup (sub_system); 554 sc->sub_system = GNUNET_strdup (sub_system);
@@ -574,7 +566,6 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
574 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc); 566 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
575 GNUNET_MQ_send (h->mq, ev); 567 GNUNET_MQ_send (h->mq, ev);
576 return sc; 568 return sc;
577
578} 569}
579 570
580 571
@@ -590,8 +581,7 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
590 * @param msg message received 581 * @param msg message received
591 */ 582 */
592static void 583static void
593handle_iterate_end (void *cls, 584handle_iterate_end (void *cls, const struct GNUNET_MessageHeader *msg)
594 const struct GNUNET_MessageHeader *msg)
595{ 585{
596 struct GNUNET_PEERSTORE_Handle *h = cls; 586 struct GNUNET_PEERSTORE_Handle *h = cls;
597 struct GNUNET_PEERSTORE_IterateContext *ic; 587 struct GNUNET_PEERSTORE_IterateContext *ic;
@@ -602,7 +592,7 @@ handle_iterate_end (void *cls,
602 if (NULL == ic) 592 if (NULL == ic)
603 { 593 {
604 LOG (GNUNET_ERROR_TYPE_ERROR, 594 LOG (GNUNET_ERROR_TYPE_ERROR,
605 _("Unexpected iteration response, this should not happen.\n")); 595 _ ("Unexpected iteration response, this should not happen.\n"));
606 disconnect_and_schedule_reconnect (h); 596 disconnect_and_schedule_reconnect (h);
607 return; 597 return;
608 } 598 }
@@ -611,9 +601,7 @@ handle_iterate_end (void *cls,
611 ic->iterating = GNUNET_NO; 601 ic->iterating = GNUNET_NO;
612 GNUNET_PEERSTORE_iterate_cancel (ic); 602 GNUNET_PEERSTORE_iterate_cancel (ic);
613 if (NULL != callback) 603 if (NULL != callback)
614 callback (callback_cls, 604 callback (callback_cls, NULL, NULL);
615 NULL,
616 NULL);
617 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 605 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
618} 606}
619 607
@@ -626,8 +614,7 @@ handle_iterate_end (void *cls,
626 * @param msg message received 614 * @param msg message received
627 */ 615 */
628static int 616static int
629check_iterate_result (void *cls, 617check_iterate_result (void *cls, const struct StoreRecordMessage *msg)
630 const struct StoreRecordMessage *msg)
631{ 618{
632 /* we defer validation to #handle_iterate_result */ 619 /* we defer validation to #handle_iterate_result */
633 return GNUNET_OK; 620 return GNUNET_OK;
@@ -641,8 +628,7 @@ check_iterate_result (void *cls,
641 * @param msg message received 628 * @param msg message received
642 */ 629 */
643static void 630static void
644handle_iterate_result (void *cls, 631handle_iterate_result (void *cls, const struct StoreRecordMessage *msg)
645 const struct StoreRecordMessage *msg)
646{ 632{
647 struct GNUNET_PEERSTORE_Handle *h = cls; 633 struct GNUNET_PEERSTORE_Handle *h = cls;
648 struct GNUNET_PEERSTORE_IterateContext *ic; 634 struct GNUNET_PEERSTORE_IterateContext *ic;
@@ -654,7 +640,7 @@ handle_iterate_result (void *cls,
654 if (NULL == ic) 640 if (NULL == ic)
655 { 641 {
656 LOG (GNUNET_ERROR_TYPE_ERROR, 642 LOG (GNUNET_ERROR_TYPE_ERROR,
657 _("Unexpected iteration response, this should not happen.\n")); 643 _ ("Unexpected iteration response, this should not happen.\n"));
658 disconnect_and_schedule_reconnect (h); 644 disconnect_and_schedule_reconnect (h);
659 return; 645 return;
660 } 646 }
@@ -668,13 +654,11 @@ handle_iterate_result (void *cls,
668 { 654 {
669 callback (callback_cls, 655 callback (callback_cls,
670 NULL, 656 NULL,
671 _("Received a malformed response from service.")); 657 _ ("Received a malformed response from service."));
672 } 658 }
673 else 659 else
674 { 660 {
675 callback (callback_cls, 661 callback (callback_cls, record, NULL);
676 record,
677 NULL);
678 PEERSTORE_destroy_record (record); 662 PEERSTORE_destroy_record (record);
679 } 663 }
680} 664}
@@ -691,9 +675,7 @@ GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
691{ 675{
692 if (GNUNET_NO == ic->iterating) 676 if (GNUNET_NO == ic->iterating)
693 { 677 {
694 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, 678 GNUNET_CONTAINER_DLL_remove (ic->h->iterate_head, ic->h->iterate_tail, ic);
695 ic->h->iterate_tail,
696 ic);
697 GNUNET_free (ic->sub_system); 679 GNUNET_free (ic->sub_system);
698 GNUNET_free_non_null (ic->key); 680 GNUNET_free_non_null (ic->key);
699 GNUNET_free (ic); 681 GNUNET_free (ic);
@@ -725,13 +707,15 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
725 struct GNUNET_MQ_Envelope *ev; 707 struct GNUNET_MQ_Envelope *ev;
726 struct GNUNET_PEERSTORE_IterateContext *ic; 708 struct GNUNET_PEERSTORE_IterateContext *ic;
727 709
728 ev = PEERSTORE_create_record_mq_envelope (sub_system, 710 ev =
729 peer, 711 PEERSTORE_create_record_mq_envelope (sub_system,
730 key, 712 peer,
731 NULL, 0, 713 key,
732 GNUNET_TIME_UNIT_FOREVER_ABS, 714 NULL,
733 0, 715 0,
734 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); 716 GNUNET_TIME_UNIT_FOREVER_ABS,
717 0,
718 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
735 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext); 719 ic = GNUNET_new (struct GNUNET_PEERSTORE_IterateContext);
736 ic->callback = callback; 720 ic->callback = callback;
737 ic->callback_cls = callback_cls; 721 ic->callback_cls = callback_cls;
@@ -741,9 +725,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
741 ic->peer = *peer; 725 ic->peer = *peer;
742 if (NULL != key) 726 if (NULL != key)
743 ic->key = GNUNET_strdup (key); 727 ic->key = GNUNET_strdup (key);
744 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, 728 GNUNET_CONTAINER_DLL_insert_tail (h->iterate_head, h->iterate_tail, ic);
745 h->iterate_tail,
746 ic);
747 LOG (GNUNET_ERROR_TYPE_DEBUG, 729 LOG (GNUNET_ERROR_TYPE_DEBUG,
748 "Sending an iterate request for sub system `%s'\n", 730 "Sending an iterate request for sub system `%s'\n",
749 sub_system); 731 sub_system);
@@ -763,8 +745,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
763 * @param msg message received 745 * @param msg message received
764 */ 746 */
765static int 747static int
766check_watch_record (void *cls, 748check_watch_record (void *cls, const struct StoreRecordMessage *msg)
767 const struct StoreRecordMessage *msg)
768{ 749{
769 /* we defer validation to #handle_watch_result */ 750 /* we defer validation to #handle_watch_result */
770 return GNUNET_OK; 751 return GNUNET_OK;
@@ -778,41 +759,33 @@ check_watch_record (void *cls,
778 * @param msg message received 759 * @param msg message received
779 */ 760 */
780static void 761static void
781handle_watch_record (void *cls, 762handle_watch_record (void *cls, const struct StoreRecordMessage *msg)
782 const struct StoreRecordMessage *msg)
783{ 763{
784 struct GNUNET_PEERSTORE_Handle *h = cls; 764 struct GNUNET_PEERSTORE_Handle *h = cls;
785 struct GNUNET_PEERSTORE_Record *record; 765 struct GNUNET_PEERSTORE_Record *record;
786 struct GNUNET_HashCode keyhash; 766 struct GNUNET_HashCode keyhash;
787 struct GNUNET_PEERSTORE_WatchContext *wc; 767 struct GNUNET_PEERSTORE_WatchContext *wc;
788 768
789 LOG (GNUNET_ERROR_TYPE_DEBUG, 769 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
790 "Received a watch record from service.\n");
791 record = PEERSTORE_parse_record_message (msg); 770 record = PEERSTORE_parse_record_message (msg);
792 if (NULL == record) 771 if (NULL == record)
793 { 772 {
794 disconnect_and_schedule_reconnect (h); 773 disconnect_and_schedule_reconnect (h);
795 return; 774 return;
796 } 775 }
797 PEERSTORE_hash_key (record->sub_system, 776 PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash);
798 &record->peer,
799 record->key,
800 &keyhash);
801 // FIXME: what if there are multiple watches for the same key? 777 // FIXME: what if there are multiple watches for the same key?
802 wc = GNUNET_CONTAINER_multihashmap_get (h->watches, 778 wc = GNUNET_CONTAINER_multihashmap_get (h->watches, &keyhash);
803 &keyhash);
804 if (NULL == wc) 779 if (NULL == wc)
805 { 780 {
806 LOG (GNUNET_ERROR_TYPE_ERROR, 781 LOG (GNUNET_ERROR_TYPE_ERROR,
807 _("Received a watch result for a non existing watch.\n")); 782 _ ("Received a watch result for a non existing watch.\n"));
808 PEERSTORE_destroy_record (record); 783 PEERSTORE_destroy_record (record);
809 disconnect_and_schedule_reconnect (h); 784 disconnect_and_schedule_reconnect (h);
810 return; 785 return;
811 } 786 }
812 if (NULL != wc->callback) 787 if (NULL != wc->callback)
813 wc->callback (wc->callback_cls, 788 wc->callback (wc->callback_cls, record, NULL);
814 record,
815 NULL);
816 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; 789 h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
817 PEERSTORE_destroy_record (record); 790 PEERSTORE_destroy_record (record);
818} 791}
@@ -827,26 +800,24 @@ static void
827reconnect (void *cls) 800reconnect (void *cls)
828{ 801{
829 struct GNUNET_PEERSTORE_Handle *h = cls; 802 struct GNUNET_PEERSTORE_Handle *h = cls;
830 struct GNUNET_MQ_MessageHandler mq_handlers[] = { 803 struct GNUNET_MQ_MessageHandler mq_handlers[] =
831 GNUNET_MQ_hd_fixed_size (iterate_end, 804 {GNUNET_MQ_hd_fixed_size (iterate_end,
832 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, 805 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
833 struct GNUNET_MessageHeader, 806 struct GNUNET_MessageHeader,
834 h), 807 h),
835 GNUNET_MQ_hd_var_size (iterate_result, 808 GNUNET_MQ_hd_var_size (iterate_result,
836 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 809 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD,
837 struct StoreRecordMessage, 810 struct StoreRecordMessage,
838 h), 811 h),
839 GNUNET_MQ_hd_var_size (watch_record, 812 GNUNET_MQ_hd_var_size (watch_record,
840 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 813 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD,
841 struct StoreRecordMessage, 814 struct StoreRecordMessage,
842 h), 815 h),
843 GNUNET_MQ_handler_end () 816 GNUNET_MQ_handler_end ()};
844 };
845 struct GNUNET_MQ_Envelope *ev; 817 struct GNUNET_MQ_Envelope *ev;
846 818
847 h->reconnect_task = NULL; 819 h->reconnect_task = NULL;
848 LOG (GNUNET_ERROR_TYPE_DEBUG, 820 LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
849 "Reconnecting...\n");
850 h->mq = GNUNET_CLIENT_connect (h->cfg, 821 h->mq = GNUNET_CLIENT_connect (h->cfg,
851 "peerstore", 822 "peerstore",
852 mq_handlers, 823 mq_handlers,
@@ -857,39 +828,35 @@ reconnect (void *cls)
857 LOG (GNUNET_ERROR_TYPE_DEBUG, 828 LOG (GNUNET_ERROR_TYPE_DEBUG,
858 "Resending pending requests after reconnect.\n"); 829 "Resending pending requests after reconnect.\n");
859 if (NULL != h->watches) 830 if (NULL != h->watches)
860 GNUNET_CONTAINER_multihashmap_iterate (h->watches, 831 GNUNET_CONTAINER_multihashmap_iterate (h->watches, &rewatch_it, h);
861 &rewatch_it, 832 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head; NULL != ic;
862 h);
863 for (struct GNUNET_PEERSTORE_IterateContext *ic = h->iterate_head;
864 NULL != ic;
865 ic = ic->next) 833 ic = ic->next)
866 { 834 {
867 ev = PEERSTORE_create_record_mq_envelope (ic->sub_system, 835 ev =
868 &ic->peer, 836 PEERSTORE_create_record_mq_envelope (ic->sub_system,
869 ic->key, 837 &ic->peer,
870 NULL, 0, 838 ic->key,
871 GNUNET_TIME_UNIT_FOREVER_ABS, 839 NULL,
872 0, 840 0,
873 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE); 841 GNUNET_TIME_UNIT_FOREVER_ABS,
842 0,
843 GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
874 GNUNET_MQ_send (h->mq, ev); 844 GNUNET_MQ_send (h->mq, ev);
875 } 845 }
876 for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; 846 for (struct GNUNET_PEERSTORE_StoreContext *sc = h->store_head; NULL != sc;
877 NULL != sc;
878 sc = sc->next) 847 sc = sc->next)
879 { 848 {
880 ev = PEERSTORE_create_record_mq_envelope (sc->sub_system, 849 ev =
881 &sc->peer, 850 PEERSTORE_create_record_mq_envelope (sc->sub_system,
882 sc->key, 851 &sc->peer,
883 sc->value, 852 sc->key,
884 sc->size, 853 sc->value,
885 sc->expiry, 854 sc->size,
886 sc->options, 855 sc->expiry,
887 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE); 856 sc->options,
888 GNUNET_MQ_notify_sent (ev, 857 GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
889 &store_request_sent, 858 GNUNET_MQ_notify_sent (ev, &store_request_sent, sc);
890 sc); 859 GNUNET_MQ_send (h->mq, ev);
891 GNUNET_MQ_send (h->mq,
892 ev);
893 } 860 }
894} 861}
895 862
@@ -906,15 +873,13 @@ GNUNET_PEERSTORE_watch_cancel (struct GNUNET_PEERSTORE_WatchContext *wc)
906 struct GNUNET_MQ_Envelope *ev; 873 struct GNUNET_MQ_Envelope *ev;
907 struct StoreKeyHashMessage *hm; 874 struct StoreKeyHashMessage *hm;
908 875
909 LOG (GNUNET_ERROR_TYPE_DEBUG, 876 LOG (GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
910 "Canceling watch.\n"); 877 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
911 ev = GNUNET_MQ_msg (hm,
912 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
913 hm->keyhash = wc->keyhash; 878 hm->keyhash = wc->keyhash;
914 GNUNET_MQ_send (h->mq, ev); 879 GNUNET_MQ_send (h->mq, ev);
915 GNUNET_CONTAINER_multihashmap_remove (h->watches, 880 GNUNET_assert (
916 &wc->keyhash, 881 GNUNET_YES ==
917 wc); 882 GNUNET_CONTAINER_multihashmap_remove (h->watches, &wc->keyhash, wc));
918 GNUNET_free (wc); 883 GNUNET_free (wc);
919} 884}
920 885
@@ -943,32 +908,26 @@ GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
943 struct StoreKeyHashMessage *hm; 908 struct StoreKeyHashMessage *hm;
944 struct GNUNET_PEERSTORE_WatchContext *wc; 909 struct GNUNET_PEERSTORE_WatchContext *wc;
945 910
946 ev = GNUNET_MQ_msg (hm, 911 ev = GNUNET_MQ_msg (hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
947 GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); 912 PEERSTORE_hash_key (sub_system, peer, key, &hm->keyhash);
948 PEERSTORE_hash_key (sub_system,
949 peer,
950 key,
951 &hm->keyhash);
952 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext); 913 wc = GNUNET_new (struct GNUNET_PEERSTORE_WatchContext);
953 wc->callback = callback; 914 wc->callback = callback;
954 wc->callback_cls = callback_cls; 915 wc->callback_cls = callback_cls;
955 wc->h = h; 916 wc->h = h;
956 wc->keyhash = hm->keyhash; 917 wc->keyhash = hm->keyhash;
957 if (NULL == h->watches) 918 if (NULL == h->watches)
958 h->watches = GNUNET_CONTAINER_multihashmap_create (5, 919 h->watches = GNUNET_CONTAINER_multihashmap_create (5, GNUNET_NO);
959 GNUNET_NO); 920 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
960 GNUNET_assert (GNUNET_OK == 921 h->watches,
961 GNUNET_CONTAINER_multihashmap_put (h->watches, 922 &wc->keyhash,
962 &wc->keyhash, 923 wc,
963 wc, 924 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
964 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
965 LOG (GNUNET_ERROR_TYPE_DEBUG, 925 LOG (GNUNET_ERROR_TYPE_DEBUG,
966 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n", 926 "Sending a watch request for subsystem `%s', peer `%s', key `%s'.\n",
967 sub_system, 927 sub_system,
968 GNUNET_i2s (peer), 928 GNUNET_i2s (peer),
969 key); 929 key);
970 GNUNET_MQ_send (h->mq, 930 GNUNET_MQ_send (h->mq, ev);
971 ev);
972 return wc; 931 return wc;
973} 932}
974 933