aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-31 21:23:23 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-31 21:23:23 +0000
commita78990b412db2c0ead2da8061c4f454f068991d1 (patch)
tree2e87adae62fd1ca3cd5a9f4c69248986f78bb106 /src/fs
parent406c7d2d2d126c994a1fff13470b1f96439c6f9d (diff)
downloadgnunet-a78990b412db2c0ead2da8061c4f454f068991d1.tar.gz
gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.zip
converting FS to new MQ-based core API
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/gnunet-service-fs.c228
-rw-r--r--src/fs/gnunet-service-fs.h26
-rw-r--r--src/fs/gnunet-service-fs_cp.c612
-rw-r--r--src/fs/gnunet-service-fs_cp.h94
-rw-r--r--src/fs/gnunet-service-fs_pe.c165
-rw-r--r--src/fs/gnunet-service-fs_pr.c93
-rw-r--r--src/fs/gnunet-service-fs_pr.h62
-rw-r--r--src/fs/gnunet-service-fs_push.c190
8 files changed, 519 insertions, 951 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index 348bab092..bc0da09bc 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2009-2014 GNUnet e.V. 3 Copyright (C) 2009-2014, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -177,7 +177,7 @@ static struct GNUNET_LOAD_Value *datastore_get_load;
177/** 177/**
178 * Identity of this peer. 178 * Identity of this peer.
179 */ 179 */
180static struct GNUNET_PeerIdentity my_id; 180struct GNUNET_PeerIdentity GSF_my_id;
181 181
182 182
183/** 183/**
@@ -277,33 +277,26 @@ update_latencies (void *cls,
277 277
278 278
279/** 279/**
280 * Handle P2P "PUT" message. 280 * Check P2P "PUT" message.
281 * 281 *
282 * @param cls closure, always NULL 282 * @param cls closure with the `struct GSF_ConnectedPeer`
283 * @param other the other peer involved (sender or receiver, NULL
284 * for loopback messages where we are both sender and receiver)
285 * @param message the actual message 283 * @param message the actual message
286 * @return #GNUNET_OK to keep the connection open, 284 * @return #GNUNET_OK to keep the connection open,
287 * #GNUNET_SYSERR to close it (signal serious error) 285 * #GNUNET_SYSERR to close it (signal serious error)
288 */ 286 */
289static int 287static int
290handle_p2p_put (void *cls, 288check_p2p_put (void *cls,
291 const struct GNUNET_PeerIdentity *other, 289 const struct PutMessage *put)
292 const struct GNUNET_MessageHeader *message)
293{ 290{
294 struct GSF_ConnectedPeer *cp; 291 enum GNUNET_BLOCK_Type type;
295 292
296 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 293 type = ntohl (put->type);
297 "Received P2P PUT from %s\n", 294 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
298 GNUNET_i2s (other));
299 cp = GSF_peer_get_ (other);
300 if (NULL == cp)
301 { 295 {
302 GNUNET_break (0); 296 GNUNET_break_op (0);
303 return GNUNET_OK; 297 return GNUNET_SYSERR;
304 } 298 }
305 GSF_cover_content_count++; 299 return GNUNET_OK;
306 return GSF_handle_p2p_content_ (cp, message);
307} 300}
308 301
309 302
@@ -324,7 +317,8 @@ consider_request_for_forwarding (void *cls,
324{ 317{
325 struct GSF_PendingRequest *pr = cls; 318 struct GSF_PendingRequest *pr = cls;
326 319
327 if (GNUNET_YES != GSF_pending_request_test_target_ (pr, peer)) 320 if (GNUNET_YES !=
321 GSF_pending_request_test_target_ (pr, peer))
328 { 322 {
329#if INSANE_STATISTICS 323#if INSANE_STATISTICS
330 GNUNET_STATISTICS_update (GSF_stats, 324 GNUNET_STATISTICS_update (GSF_stats,
@@ -333,7 +327,8 @@ consider_request_for_forwarding (void *cls,
333#endif 327#endif
334 return; 328 return;
335 } 329 }
336 GSF_plan_add_ (cp, pr); 330 GSF_plan_add_ (cp,
331 pr);
337} 332}
338 333
339 334
@@ -347,10 +342,10 @@ consider_request_for_forwarding (void *cls,
347 * @param pr the pending request we were processing 342 * @param pr the pending request we were processing
348 * @param result final datastore lookup result 343 * @param result final datastore lookup result
349 */ 344 */
350static void 345void
351consider_forwarding (void *cls, 346GSF_consider_forwarding (void *cls,
352 struct GSF_PendingRequest *pr, 347 struct GSF_PendingRequest *pr,
353 enum GNUNET_BLOCK_EvaluationResult result) 348 enum GNUNET_BLOCK_EvaluationResult result)
354{ 349{
355 if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) 350 if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
356 return; /* we're done... */ 351 return; /* we're done... */
@@ -363,31 +358,44 @@ consider_forwarding (void *cls,
363 358
364 359
365/** 360/**
366 * Handle P2P "GET" request. 361 * Check P2P "GET" request.
367 * 362 *
368 * @param cls closure, always NULL 363 * @param cls closure
369 * @param other the other peer involved (sender or receiver, NULL 364 * @param gm the actual message
370 * for loopback messages where we are both sender and receiver)
371 * @param message the actual message
372 * @return #GNUNET_OK to keep the connection open, 365 * @return #GNUNET_OK to keep the connection open,
373 * #GNUNET_SYSERR to close it (signal serious error) 366 * #GNUNET_SYSERR to close it (signal serious error)
374 */ 367 */
375static int 368static int
376handle_p2p_get (void *cls, 369check_p2p_get (void *cls,
377 const struct GNUNET_PeerIdentity *other, 370 const struct GetMessage *gm)
378 const struct GNUNET_MessageHeader *message)
379{ 371{
380 struct GSF_PendingRequest *pr; 372 size_t msize;
381 373 unsigned int bm;
382 pr = GSF_handle_p2p_query_ (other, 374 unsigned int bits;
383 message); 375 size_t bfsize;
384 if (NULL == pr) 376
385 return GNUNET_OK; /* exists, identical to existing request, or malformed */ 377 msize = ntohs (gm->header.size);
386 GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; 378 bm = ntohl (gm->hash_bitmap);
387 GSF_local_lookup_ (pr, 379 bits = 0;
388 &consider_forwarding, 380 while (bm > 0)
389 NULL); 381 {
390 return GNUNET_OK; 382 if (1 == (bm & 1))
383 bits++;
384 bm >>= 1;
385 }
386 if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
387 {
388 GNUNET_break_op (0);
389 return GNUNET_SYSERR;
390 }
391 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
392 /* bfsize must be power of 2, check! */
393 if (0 != ((bfsize - 1) & bfsize))
394 {
395 GNUNET_break_op (0);
396 return GNUNET_SYSERR;
397 }
398 return GNUNET_OK;
391} 399}
392 400
393 401
@@ -416,7 +424,8 @@ start_p2p_processing (void *cls,
416 prd = GSF_pending_request_get_data_ (pr); 424 prd = GSF_pending_request_get_data_ (pr);
417 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 425 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
418 "Finished database lookup for local request `%s' with result %d\n", 426 "Finished database lookup for local request `%s' with result %d\n",
419 GNUNET_h2s (&prd->query), result); 427 GNUNET_h2s (&prd->query),
428 result);
420 if (0 == prd->anonymity_level) 429 if (0 == prd->anonymity_level)
421 { 430 {
422 switch (prd->type) 431 switch (prd->type)
@@ -439,7 +448,7 @@ start_p2p_processing (void *cls,
439 break; 448 break;
440 } 449 }
441 } 450 }
442 consider_forwarding (NULL, pr, result); 451 GSF_consider_forwarding (NULL, pr, result);
443} 452}
444 453
445 454
@@ -538,7 +547,7 @@ shutdown_task (void *cls)
538 GSF_cadet_stop_server (); 547 GSF_cadet_stop_server ();
539 if (NULL != GSF_core) 548 if (NULL != GSF_core)
540 { 549 {
541 GNUNET_CORE_disconnect (GSF_core); 550 GNUNET_CORE_disconnecT (GSF_core);
542 GSF_core = NULL; 551 GSF_core = NULL;
543 } 552 }
544 if (NULL != GSF_ats) 553 if (NULL != GSF_ats)
@@ -575,80 +584,7 @@ shutdown_task (void *cls)
575 584
576 585
577/** 586/**
578 * Function called for each pending request whenever a new 587 * Function called after GNUNET_CORE_connecT has succeeded
579 * peer connects, giving us a chance to decide about submitting
580 * the existing request to the new peer.
581 *
582 * @param cls the `struct GSF_ConnectedPeer` of the new peer
583 * @param key query for the request
584 * @param pr handle to the pending request
585 * @return #GNUNET_YES to continue to iterate
586 */
587static int
588consider_peer_for_forwarding (void *cls,
589 const struct GNUNET_HashCode *key,
590 struct GSF_PendingRequest *pr)
591{
592 struct GSF_ConnectedPeer *cp = cls;
593 struct GNUNET_PeerIdentity pid;
594
595 if (GNUNET_YES !=
596 GSF_pending_request_test_active_ (pr))
597 return GNUNET_YES; /* request is not actually active, skip! */
598 GSF_connected_peer_get_identity_ (cp, &pid);
599 if (GNUNET_YES !=
600 GSF_pending_request_test_target_ (pr, &pid))
601 {
602 GNUNET_STATISTICS_update (GSF_stats,
603 gettext_noop ("# Loopback routes suppressed"),
604 1,
605 GNUNET_NO);
606 return GNUNET_YES;
607 }
608 GSF_plan_add_ (cp, pr);
609 return GNUNET_YES;
610}
611
612
613/**
614 * Function called after the creation of a connected peer record is complete.
615 *
616 * @param cls closure (unused)
617 * @param cp handle to the newly created connected peer record
618 */
619static void
620connected_peer_cb (void *cls,
621 struct GSF_ConnectedPeer *cp)
622{
623 if (NULL == cp)
624 return;
625 GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
626 cp);
627}
628
629
630/**
631 * Method called whenever a given peer connects.
632 *
633 * @param cls closure, not used
634 * @param peer peer identity this notification is about
635 */
636static void
637peer_connect_handler (void *cls,
638 const struct GNUNET_PeerIdentity *peer)
639{
640 if (0 ==
641 GNUNET_CRYPTO_cmp_peer_identity (&my_id,
642 peer))
643 return;
644 GSF_peer_connect_handler_ (peer,
645 &connected_peer_cb,
646 NULL);
647}
648
649
650/**
651 * Function called after GNUNET_CORE_connect has succeeded
652 * (or failed for good). Note that the private key of the 588 * (or failed for good). Note that the private key of the
653 * peer is intentionally not exposed here; if you need it, 589 * peer is intentionally not exposed here; if you need it,
654 * your process should try to read the private key file 590 * your process should try to read the private key file
@@ -661,7 +597,7 @@ static void
661peer_init_handler (void *cls, 597peer_init_handler (void *cls,
662 const struct GNUNET_PeerIdentity *my_identity) 598 const struct GNUNET_PeerIdentity *my_identity)
663{ 599{
664 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&my_id, 600 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&GSF_my_id,
665 my_identity)) 601 my_identity))
666 { 602 {
667 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 603 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -681,18 +617,23 @@ static int
681main_init (struct GNUNET_SERVER_Handle *server, 617main_init (struct GNUNET_SERVER_Handle *server,
682 const struct GNUNET_CONFIGURATION_Handle *c) 618 const struct GNUNET_CONFIGURATION_Handle *c)
683{ 619{
684 static const struct GNUNET_CORE_MessageHandler no_p2p_handlers[] = { 620 GNUNET_MQ_hd_var_size (p2p_get,
685 { NULL, 0, 0 } 621 GNUNET_MESSAGE_TYPE_FS_GET,
622 struct GetMessage);
623 GNUNET_MQ_hd_var_size (p2p_put,
624 GNUNET_MESSAGE_TYPE_FS_PUT,
625 struct PutMessage);
626 GNUNET_MQ_hd_fixed_size (p2p_migration_stop,
627 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
628 struct MigrationStopMessage);
629 struct GNUNET_MQ_MessageHandler no_p2p_handlers[] = {
630 GNUNET_MQ_handler_end ()
686 }; 631 };
687 static const struct GNUNET_CORE_MessageHandler p2p_handlers[] = { 632 struct GNUNET_MQ_MessageHandler p2p_handlers[] = {
688 { &handle_p2p_get, 633 make_p2p_get_handler (NULL),
689 GNUNET_MESSAGE_TYPE_FS_GET, 0 }, 634 make_p2p_put_handler (NULL),
690 { &handle_p2p_put, 635 make_p2p_migration_stop_handler (NULL),
691 GNUNET_MESSAGE_TYPE_FS_PUT, 0 }, 636 GNUNET_MQ_handler_end ()
692 { &GSF_handle_p2p_migration_stop_,
693 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
694 sizeof (struct MigrationStopMessage) },
695 { NULL, 0, 0 }
696 }; 637 };
697 static const struct GNUNET_SERVER_MessageHandler handlers[] = { 638 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
698 { &GNUNET_FS_handle_index_start, NULL, 639 { &GNUNET_FS_handle_index_start, NULL,
@@ -735,28 +676,29 @@ main_init (struct GNUNET_SERVER_Handle *server,
735 GNUNET_free (keyfile); 676 GNUNET_free (keyfile);
736 GNUNET_assert (NULL != pk); 677 GNUNET_assert (NULL != pk);
737 GNUNET_CRYPTO_eddsa_key_get_public (pk, 678 GNUNET_CRYPTO_eddsa_key_get_public (pk,
738 &my_id.public_key); 679 &GSF_my_id.public_key);
739 680
740 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 681 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
741 "I am peer %s\n", 682 "I am peer %s\n",
742 GNUNET_i2s (&my_id)); 683 GNUNET_i2s (&GSF_my_id));
743 GSF_core 684 GSF_core
744 = GNUNET_CORE_connect (GSF_cfg, NULL, 685 = GNUNET_CORE_connecT (GSF_cfg,
686 NULL,
745 &peer_init_handler, 687 &peer_init_handler,
746 &peer_connect_handler, 688 &GSF_peer_connect_handler,
747 &GSF_peer_disconnect_handler_, 689 &GSF_peer_disconnect_handler,
748 NULL, GNUNET_NO,
749 NULL, GNUNET_NO,
750 (GNUNET_YES == anon_p2p_off) 690 (GNUNET_YES == anon_p2p_off)
751 ? no_p2p_handlers 691 ? no_p2p_handlers
752 : p2p_handlers); 692 : p2p_handlers);
753 if (NULL == GSF_core) 693 if (NULL == GSF_core)
754 { 694 {
755 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 695 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
756 _("Failed to connect to `%s' service.\n"), "core"); 696 _("Failed to connect to `%s' service.\n"),
697 "core");
757 return GNUNET_SYSERR; 698 return GNUNET_SYSERR;
758 } 699 }
759 GNUNET_SERVER_disconnect_notify (server, &GSF_client_disconnect_handler_, 700 GNUNET_SERVER_disconnect_notify (server,
701 &GSF_client_disconnect_handler_,
760 NULL); 702 NULL);
761 GNUNET_SERVER_add_handlers (server, handlers); 703 GNUNET_SERVER_add_handlers (server, handlers);
762 cover_age_task = 704 cover_age_task =
diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h
index 92cb4088e..2a0f7ba29 100644
--- a/src/fs/gnunet-service-fs.h
+++ b/src/fs/gnunet-service-fs.h
@@ -223,6 +223,10 @@ extern struct GNUNET_TIME_Relative GSF_avg_latency;
223 */ 223 */
224extern struct GNUNET_ATS_PerformanceHandle *GSF_ats; 224extern struct GNUNET_ATS_PerformanceHandle *GSF_ats;
225 225
226/**
227 * Identity of this peer.
228 */
229extern struct GNUNET_PeerIdentity GSF_my_id;
226 230
227/** 231/**
228 * Typical priorities we're seeing from other peers right now. Since 232 * Typical priorities we're seeing from other peers right now. Since
@@ -265,13 +269,29 @@ extern unsigned int GSF_datastore_queue_size;
265 269
266 270
267/** 271/**
272 * Function to be called after we're done processing
273 * replies from the local lookup. If the result status
274 * code indicates that there may be more replies, plan
275 * forwarding the request.
276 *
277 * @param cls closure (NULL)
278 * @param pr the pending request we were processing
279 * @param result final datastore lookup result
280 */
281void
282GSF_consider_forwarding (void *cls,
283 struct GSF_PendingRequest *pr,
284 enum GNUNET_BLOCK_EvaluationResult result);
285
286
287/**
268 * Test if the DATABASE (GET) load on this peer is too high 288 * Test if the DATABASE (GET) load on this peer is too high
269 * to even consider processing the query at 289 * to even consider processing the query at
270 * all. 290 * all.
271 * 291 *
272 * @return GNUNET_YES if the load is too high to do anything (load high) 292 * @return #GNUNET_YES if the load is too high to do anything (load high)
273 * GNUNET_NO to process normally (load normal) 293 * #GNUNET_NO to process normally (load normal)
274 * GNUNET_SYSERR to process for free (load low) 294 * #GNUNET_SYSERR to process for free (load low)
275 */ 295 */
276int 296int
277GSF_test_get_load_too_high_ (uint32_t priority); 297GSF_test_get_load_too_high_ (uint32_t priority);
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index bda33d766..3f7783ded 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2011 GNUnet e.V. 3 Copyright (C) 2011, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -78,19 +78,9 @@ struct GSF_PeerTransmitHandle
78 struct GNUNET_TIME_Absolute transmission_request_start_time; 78 struct GNUNET_TIME_Absolute transmission_request_start_time;
79 79
80 /** 80 /**
81 * Timeout for this request. 81 * Envelope with the actual message.
82 */ 82 */
83 struct GNUNET_TIME_Absolute timeout; 83 struct GNUNET_MQ_Envelope *env;
84
85 /**
86 * Task called on timeout, or 0 for none.
87 */
88 struct GNUNET_SCHEDULER_Task *timeout_task;
89
90 /**
91 * Function to call to get the actual message.
92 */
93 GSF_GetMessageCallback gmc;
94 84
95 /** 85 /**
96 * Peer this request targets. 86 * Peer this request targets.
@@ -98,16 +88,6 @@ struct GSF_PeerTransmitHandle
98 struct GSF_ConnectedPeer *cp; 88 struct GSF_ConnectedPeer *cp;
99 89
100 /** 90 /**
101 * Closure for @e gmc.
102 */
103 void *gmc_cls;
104
105 /**
106 * Size of the message to be transmitted.
107 */
108 size_t size;
109
110 /**
111 * #GNUNET_YES if this is a query, #GNUNET_NO for content. 91 * #GNUNET_YES if this is a query, #GNUNET_NO for content.
112 */ 92 */
113 int is_query; 93 int is_query;
@@ -147,9 +127,9 @@ struct GSF_DelayedHandle
147 struct GSF_ConnectedPeer *cp; 127 struct GSF_ConnectedPeer *cp;
148 128
149 /** 129 /**
150 * The PUT that was delayed. 130 * Envelope of the message that was delayed.
151 */ 131 */
152 struct PutMessage *pm; 132 struct GNUNET_MQ_Envelope *env;
153 133
154 /** 134 /**
155 * Task for the delay. 135 * Task for the delay.
@@ -235,11 +215,6 @@ struct GSF_ConnectedPeer
235 struct GSF_DelayedHandle *delayed_tail; 215 struct GSF_DelayedHandle *delayed_tail;
236 216
237 /** 217 /**
238 * Migration stop message in our queue, or NULL if we have none pending.
239 */
240 struct GSF_PeerTransmitHandle *migration_pth;
241
242 /**
243 * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL). 218 * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
244 */ 219 */
245 struct GNUNET_ATS_ReservationContext *rc; 220 struct GNUNET_ATS_ReservationContext *rc;
@@ -256,9 +231,9 @@ struct GSF_ConnectedPeer
256 231
257 /** 232 /**
258 * Handle for an active request for transmission to this 233 * Handle for an active request for transmission to this
259 * peer, or NULL (if core queue was full). 234 * peer.
260 */ 235 */
261 struct GNUNET_CORE_TransmitHandle *cth; 236 struct GNUNET_MQ_Handle *mq;
262 237
263 /** 238 /**
264 * Increase in traffic preference still to be submitted 239 * Increase in traffic preference still to be submitted
@@ -267,14 +242,6 @@ struct GSF_ConnectedPeer
267 uint64_t inc_preference; 242 uint64_t inc_preference;
268 243
269 /** 244 /**
270 * Set to 1 if we're currently in the process of calling
271 * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
272 * NULL, we should not call notify_transmit_ready for this
273 * handle right now).
274 */
275 unsigned int cth_in_progress;
276
277 /**
278 * Number of entries in @e delayed_head DLL. 245 * Number of entries in @e delayed_head DLL.
279 */ 246 */
280 unsigned int delay_queue_size; 247 unsigned int delay_queue_size;
@@ -308,16 +275,6 @@ struct GSF_ConnectedPeer
308 int did_reserve; 275 int did_reserve;
309 276
310 /** 277 /**
311 * Function called when the creation of this record is complete.
312 */
313 GSF_ConnectedPeerCreationCallback creation_cb;
314
315 /**
316 * Closure for @e creation_cb
317 */
318 void *creation_cb_cls;
319
320 /**
321 * Handle to the PEERSTORE iterate request for peer respect value 278 * Handle to the PEERSTORE iterate request for peer respect value
322 */ 279 */
323 struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req; 280 struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
@@ -377,15 +334,10 @@ GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
377/** 334/**
378 * Core is ready to transmit to a peer, get the message. 335 * Core is ready to transmit to a peer, get the message.
379 * 336 *
380 * @param cls the `struct GSF_PeerTransmitHandle` of the message 337 * @param cp which peer to send a message to
381 * @param size number of bytes core is willing to take
382 * @param buf where to copy the message
383 * @return number of bytes copied to @a buf
384 */ 338 */
385static size_t 339static void
386peer_transmit_ready_cb (void *cls, 340peer_transmit (struct GSF_ConnectedPeer *cp);
387 size_t size,
388 void *buf);
389 341
390 342
391/** 343/**
@@ -418,8 +370,6 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth)
418 struct GNUNET_PeerIdentity target; 370 struct GNUNET_PeerIdentity target;
419 371
420 cp = pth->cp; 372 cp = pth->cp;
421 if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
422 return; /* already done */
423 GNUNET_assert (0 != cp->ppd.pid); 373 GNUNET_assert (0 != cp->ppd.pid);
424 GNUNET_PEER_resolve (cp->ppd.pid, &target); 374 GNUNET_PEER_resolve (cp->ppd.pid, &target);
425 375
@@ -449,52 +399,23 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth)
449 cp); 399 cp);
450 return; 400 return;
451 } 401 }
452 GNUNET_assert (NULL == cp->cth); 402 peer_transmit (cp);
453 cp->cth_in_progress++;
454 cp->cth =
455 GNUNET_CORE_notify_transmit_ready (GSF_core,
456 GNUNET_YES,
457 GNUNET_CORE_PRIO_BACKGROUND,
458 GNUNET_TIME_absolute_get_remaining (pth->timeout),
459 &target,
460 pth->size,
461 &peer_transmit_ready_cb, cp);
462 GNUNET_assert (NULL != cp->cth);
463 GNUNET_assert (0 < cp->cth_in_progress--);
464} 403}
465 404
466 405
467/** 406/**
468 * Core is ready to transmit to a peer, get the message. 407 * Core is ready to transmit to a peer, get the message.
469 * 408 *
470 * @param cls the `struct GSF_PeerTransmitHandle` of the message 409 * @param cp which peer to send a message to
471 * @param size number of bytes core is willing to take
472 * @param buf where to copy the message
473 * @return number of bytes copied to @a buf
474 */ 410 */
475static size_t 411static void
476peer_transmit_ready_cb (void *cls, 412peer_transmit (struct GSF_ConnectedPeer *cp)
477 size_t size,
478 void *buf)
479{ 413{
480 struct GSF_ConnectedPeer *cp = cls;
481 struct GSF_PeerTransmitHandle *pth = cp->pth_head; 414 struct GSF_PeerTransmitHandle *pth = cp->pth_head;
482 struct GSF_PeerTransmitHandle *pos; 415 struct GSF_PeerTransmitHandle *pos;
483 size_t ret;
484 416
485 cp->cth = NULL;
486 if (NULL == pth) 417 if (NULL == pth)
487 return 0; 418 return;
488 if (pth->size > size)
489 {
490 schedule_transmission (pth);
491 return 0;
492 }
493 if (NULL != pth->timeout_task)
494 {
495 GNUNET_SCHEDULER_cancel (pth->timeout_task);
496 pth->timeout_task = NULL;
497 }
498 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 419 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
499 cp->pth_tail, 420 cp->pth_tail,
500 pth); 421 pth);
@@ -512,14 +433,14 @@ peer_transmit_ready_cb (void *cls,
512 GNUNET_LOAD_update (cp->ppd.transmission_delay, 433 GNUNET_LOAD_update (cp->ppd.transmission_delay,
513 GNUNET_TIME_absolute_get_duration 434 GNUNET_TIME_absolute_get_duration
514 (pth->transmission_request_start_time).rel_value_us); 435 (pth->transmission_request_start_time).rel_value_us);
515 ret = pth->gmc (pth->gmc_cls, size, buf); 436 GNUNET_MQ_send (cp->mq,
437 pth->env);
438 GNUNET_free (pth);
516 if (NULL != (pos = cp->pth_head)) 439 if (NULL != (pos = cp->pth_head))
517 { 440 {
518 GNUNET_assert (pos != pth); 441 GNUNET_assert (pos != pth);
519 schedule_transmission (pos); 442 schedule_transmission (pos);
520 } 443 }
521 GNUNET_free (pth);
522 return ret;
523} 444}
524 445
525 446
@@ -578,23 +499,10 @@ ats_reserve_callback (void *cls,
578 } 499 }
579 cp->did_reserve = GNUNET_YES; 500 cp->did_reserve = GNUNET_YES;
580 pth = cp->pth_head; 501 pth = cp->pth_head;
581 if ( (NULL != pth) && 502 if (NULL != pth)
582 (NULL == cp->cth) &&
583 (0 == cp->cth_in_progress) )
584 { 503 {
585 /* reservation success, try transmission now! */ 504 /* reservation success, try transmission now! */
586 cp->cth_in_progress++; 505 peer_transmit (cp);
587 cp->cth =
588 GNUNET_CORE_notify_transmit_ready (GSF_core,
589 GNUNET_YES,
590 GNUNET_CORE_PRIO_BACKGROUND,
591 GNUNET_TIME_absolute_get_remaining (pth->timeout),
592 peer,
593 pth->size,
594 &peer_transmit_ready_cb,
595 cp);
596 GNUNET_assert (NULL != cp->cth);
597 GNUNET_assert (0 < cp->cth_in_progress--);
598 } 506 }
599} 507}
600 508
@@ -614,11 +522,13 @@ peer_respect_cb (void *cls,
614 struct GSF_ConnectedPeer *cp = cls; 522 struct GSF_ConnectedPeer *cp = cls;
615 523
616 GNUNET_assert (NULL != cp->respect_iterate_req); 524 GNUNET_assert (NULL != cp->respect_iterate_req);
617 if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size)) 525 if ( (NULL != record) &&
618 cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value); 526 (sizeof (cp->disk_respect) == record->value_size))
527 {
528 cp->disk_respect = *((uint32_t *)record->value);
529 cp->ppd.respect += *((uint32_t *)record->value);
530 }
619 GSF_push_start_ (cp); 531 GSF_push_start_ (cp);
620 if (NULL != cp->creation_cb)
621 cp->creation_cb (cp->creation_cb_cls, cp);
622 if (NULL != record) 532 if (NULL != record)
623 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); 533 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
624 cp->respect_iterate_req = NULL; 534 cp->respect_iterate_req = NULL;
@@ -626,25 +536,68 @@ peer_respect_cb (void *cls,
626 536
627 537
628/** 538/**
539 * Function called for each pending request whenever a new
540 * peer connects, giving us a chance to decide about submitting
541 * the existing request to the new peer.
542 *
543 * @param cls the `struct GSF_ConnectedPeer` of the new peer
544 * @param key query for the request
545 * @param pr handle to the pending request
546 * @return #GNUNET_YES to continue to iterate
547 */
548static int
549consider_peer_for_forwarding (void *cls,
550 const struct GNUNET_HashCode *key,
551 struct GSF_PendingRequest *pr)
552{
553 struct GSF_ConnectedPeer *cp = cls;
554 struct GNUNET_PeerIdentity pid;
555
556 if (GNUNET_YES !=
557 GSF_pending_request_test_active_ (pr))
558 return GNUNET_YES; /* request is not actually active, skip! */
559 GSF_connected_peer_get_identity_ (cp, &pid);
560 if (GNUNET_YES !=
561 GSF_pending_request_test_target_ (pr, &pid))
562 {
563 GNUNET_STATISTICS_update (GSF_stats,
564 gettext_noop ("# Loopback routes suppressed"),
565 1,
566 GNUNET_NO);
567 return GNUNET_YES;
568 }
569 GSF_plan_add_ (cp, pr);
570 return GNUNET_YES;
571}
572
573
574/**
629 * A peer connected to us. Setup the connected peer 575 * A peer connected to us. Setup the connected peer
630 * records. 576 * records.
631 * 577 *
578 * @param cls NULL
632 * @param peer identity of peer that connected 579 * @param peer identity of peer that connected
633 * @param creation_cb callback function when the record is created. 580 * @param mq message queue for talking to @a peer
634 * @param creation_cb_cls closure for @creation_cb 581 * @return our internal handle for the peer
635 */ 582 */
636void 583void *
637GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, 584GSF_peer_connect_handler (void *cls,
638 GSF_ConnectedPeerCreationCallback creation_cb, 585 const struct GNUNET_PeerIdentity *peer,
639 void *creation_cb_cls) 586 struct GNUNET_MQ_Handle *mq)
640{ 587{
641 struct GSF_ConnectedPeer *cp; 588 struct GSF_ConnectedPeer *cp;
642 589
590 if (0 ==
591 GNUNET_CRYPTO_cmp_peer_identity (&GSF_my_id,
592 peer))
593 return NULL;
643 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 594 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
644 "Connected to peer %s\n", 595 "Connected to peer %s\n",
645 GNUNET_i2s (peer)); 596 GNUNET_i2s (peer));
646 cp = GNUNET_new (struct GSF_ConnectedPeer); 597 cp = GNUNET_new (struct GSF_ConnectedPeer);
647 cp->ppd.pid = GNUNET_PEER_intern (peer); 598 cp->ppd.pid = GNUNET_PEER_intern (peer);
599 cp->ppd.peer = peer;
600 cp->mq = mq;
648 cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO); 601 cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
649 cp->rc = 602 cp->rc =
650 GNUNET_ATS_reserve_bandwidth (GSF_ats, 603 GNUNET_ATS_reserve_bandwidth (GSF_ats,
@@ -662,14 +615,17 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
662 gettext_noop ("# peers connected"), 615 gettext_noop ("# peers connected"),
663 GNUNET_CONTAINER_multipeermap_size (cp_map), 616 GNUNET_CONTAINER_multipeermap_size (cp_map),
664 GNUNET_NO); 617 GNUNET_NO);
665 cp->creation_cb = creation_cb; 618 cp->respect_iterate_req
666 cp->creation_cb_cls = creation_cb_cls; 619 = GNUNET_PEERSTORE_iterate (peerstore,
667 cp->respect_iterate_req = 620 "fs",
668 GNUNET_PEERSTORE_iterate (peerstore, "fs", 621 peer,
669 peer, "respect", 622 "respect",
670 GNUNET_TIME_UNIT_FOREVER_REL, 623 GNUNET_TIME_UNIT_FOREVER_REL,
671 &peer_respect_cb, 624 &peer_respect_cb,
672 cp); 625 cp);
626 GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
627 cp);
628 return cp;
673} 629}
674 630
675 631
@@ -714,38 +670,25 @@ GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
714 670
715 671
716/** 672/**
717 * Handle P2P "MIGRATION_STOP" message. 673 * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
718 * 674 *
719 * @param cls closure, always NULL 675 * @param cls closure, the `struct GSF_ConnectedPeer`
720 * @param other the other peer involved (sender or receiver, NULL 676 * @param msm the actual message
721 * for loopback messages where we are both sender and receiver)
722 * @param message the actual message
723 * @return #GNUNET_OK to keep the connection open,
724 * #GNUNET_SYSERR to close it (signal serious error)
725 */ 677 */
726int 678void
727GSF_handle_p2p_migration_stop_ (void *cls, 679handle_p2p_migration_stop (void *cls,
728 const struct GNUNET_PeerIdentity *other, 680 const struct MigrationStopMessage *msm)
729 const struct GNUNET_MessageHeader *message)
730{ 681{
731 struct GSF_ConnectedPeer *cp; 682 struct GSF_ConnectedPeer *cp = cls;
732 const struct MigrationStopMessage *msm;
733 struct GNUNET_TIME_Relative bt; 683 struct GNUNET_TIME_Relative bt;
734 684
735 msm = (const struct MigrationStopMessage *) message;
736 cp = GSF_peer_get_ (other);
737 if (NULL == cp)
738 {
739 GNUNET_break (0);
740 return GNUNET_OK;
741 }
742 GNUNET_STATISTICS_update (GSF_stats, 685 GNUNET_STATISTICS_update (GSF_stats,
743 gettext_noop ("# migration stop messages received"), 686 gettext_noop ("# migration stop messages received"),
744 1, GNUNET_NO); 687 1, GNUNET_NO);
745 bt = GNUNET_TIME_relative_ntoh (msm->duration); 688 bt = GNUNET_TIME_relative_ntoh (msm->duration);
746 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 689 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
747 _("Migration of content to peer `%s' blocked for %s\n"), 690 _("Migration of content to peer `%s' blocked for %s\n"),
748 GNUNET_i2s (other), 691 GNUNET_i2s (cp->ppd.peer),
749 GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES)); 692 GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
750 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt); 693 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
751 if ( (NULL == cp->mig_revive_task) && 694 if ( (NULL == cp->mig_revive_task) &&
@@ -756,46 +699,6 @@ GSF_handle_p2p_migration_stop_ (void *cls,
756 GNUNET_SCHEDULER_add_delayed (bt, 699 GNUNET_SCHEDULER_add_delayed (bt,
757 &revive_migration, cp); 700 &revive_migration, cp);
758 } 701 }
759 return GNUNET_OK;
760}
761
762
763/**
764 * Copy reply and free put message.
765 *
766 * @param cls the `struct PutMessage`
767 * @param buf_size number of bytes available in @a buf
768 * @param buf where to copy the message, NULL on error (peer disconnect)
769 * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
770 */
771static size_t
772copy_reply (void *cls,
773 size_t buf_size,
774 void *buf)
775{
776 struct PutMessage *pm = cls;
777 size_t size;
778
779 if (NULL != buf)
780 {
781 GNUNET_assert (buf_size >= ntohs (pm->header.size));
782 size = ntohs (pm->header.size);
783 GNUNET_memcpy (buf, pm, size);
784 GNUNET_STATISTICS_update (GSF_stats,
785 gettext_noop ("# replies transmitted to other peers"),
786 1,
787 GNUNET_NO);
788 }
789 else
790 {
791 size = 0;
792 GNUNET_STATISTICS_update (GSF_stats,
793 gettext_noop ("# replies dropped"),
794 1,
795 GNUNET_NO);
796 }
797 GNUNET_free (pm);
798 return size;
799} 702}
800 703
801 704
@@ -886,13 +789,10 @@ transmit_delayed_now (void *cls)
886 cp->delayed_tail, 789 cp->delayed_tail,
887 dh); 790 dh);
888 cp->delay_queue_size--; 791 cp->delay_queue_size--;
889 (void) GSF_peer_transmit_ (cp, 792 GSF_peer_transmit_ (cp,
890 GNUNET_NO, 793 GNUNET_NO,
891 UINT32_MAX, 794 UINT32_MAX,
892 REPLY_TIMEOUT, 795 dh->env);
893 dh->msize,
894 &copy_reply,
895 dh->pm);
896 GNUNET_free (dh); 796 GNUNET_free (dh);
897} 797}
898 798
@@ -954,6 +854,7 @@ handle_p2p_reply (void *cls,
954 struct PeerRequest *peerreq = cls; 854 struct PeerRequest *peerreq = cls;
955 struct GSF_ConnectedPeer *cp = peerreq->cp; 855 struct GSF_ConnectedPeer *cp = peerreq->cp;
956 struct GSF_PendingRequestData *prd; 856 struct GSF_PendingRequestData *prd;
857 struct GNUNET_MQ_Envelope *env;
957 struct PutMessage *pm; 858 struct PutMessage *pm;
958 size_t msize; 859 size_t msize;
959 860
@@ -1000,12 +901,14 @@ handle_p2p_reply (void *cls,
1000 GSF_cover_content_count -= (reply_anonymity_level - 1); 901 GSF_cover_content_count -= (reply_anonymity_level - 1);
1001 } 902 }
1002 903
1003 pm = GNUNET_malloc (msize); 904 env = GNUNET_MQ_msg_extra (pm,
1004 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); 905 data_len,
1005 pm->header.size = htons (msize); 906 GNUNET_MESSAGE_TYPE_FS_PUT);
1006 pm->type = htonl (type); 907 pm->type = htonl (type);
1007 pm->expiration = GNUNET_TIME_absolute_hton (expiration); 908 pm->expiration = GNUNET_TIME_absolute_hton (expiration);
1008 GNUNET_memcpy (&pm[1], data, data_len); 909 GNUNET_memcpy (&pm[1],
910 data,
911 data_len);
1009 if ( (UINT32_MAX != reply_anonymity_level) && 912 if ( (UINT32_MAX != reply_anonymity_level) &&
1010 (0 != reply_anonymity_level) && 913 (0 != reply_anonymity_level) &&
1011 (GNUNET_YES == GSF_enable_randomized_delays) ) 914 (GNUNET_YES == GSF_enable_randomized_delays) )
@@ -1014,7 +917,7 @@ handle_p2p_reply (void *cls,
1014 917
1015 dh = GNUNET_new (struct GSF_DelayedHandle); 918 dh = GNUNET_new (struct GSF_DelayedHandle);
1016 dh->cp = cp; 919 dh->cp = cp;
1017 dh->pm = pm; 920 dh->env = env;
1018 dh->msize = msize; 921 dh->msize = msize;
1019 GNUNET_CONTAINER_DLL_insert (cp->delayed_head, 922 GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
1020 cp->delayed_tail, 923 cp->delayed_tail,
@@ -1027,13 +930,10 @@ handle_p2p_reply (void *cls,
1027 } 930 }
1028 else 931 else
1029 { 932 {
1030 (void) GSF_peer_transmit_ (cp, 933 GSF_peer_transmit_ (cp,
1031 GNUNET_NO, 934 GNUNET_NO,
1032 UINT32_MAX, 935 UINT32_MAX,
1033 REPLY_TIMEOUT, 936 env);
1034 msize,
1035 &copy_reply,
1036 pm);
1037 } 937 }
1038 if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval) 938 if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
1039 return; 939 return;
@@ -1265,23 +1165,20 @@ test_exist_cb (void *cls,
1265 * process replies properly. Does not initiate forwarding or 1165 * process replies properly. Does not initiate forwarding or
1266 * local database lookups. 1166 * local database lookups.
1267 * 1167 *
1268 * @param other the other peer involved (sender or receiver, NULL 1168 * @param cls the other peer involved (sender of the message)
1269 * for loopback messages where we are both sender and receiver) 1169 * @param gm the GET message
1270 * @param message the actual message
1271 * @return pending request handle, NULL on error
1272 */ 1170 */
1273struct GSF_PendingRequest * 1171void
1274GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, 1172handle_p2p_get (void *cls,
1275 const struct GNUNET_MessageHeader *message) 1173 const struct GetMessage *gm)
1276{ 1174{
1175 struct GSF_ConnectedPeer *cps = cls;
1277 struct PeerRequest *peerreq; 1176 struct PeerRequest *peerreq;
1278 struct GSF_PendingRequest *pr; 1177 struct GSF_PendingRequest *pr;
1279 struct GSF_ConnectedPeer *cp; 1178 struct GSF_ConnectedPeer *cp;
1280 struct GSF_ConnectedPeer *cps;
1281 const struct GNUNET_PeerIdentity *target; 1179 const struct GNUNET_PeerIdentity *target;
1282 enum GSF_PendingRequestOptions options; 1180 enum GSF_PendingRequestOptions options;
1283 uint16_t msize; 1181 uint16_t msize;
1284 const struct GetMessage *gm;
1285 unsigned int bits; 1182 unsigned int bits;
1286 const struct GNUNET_PeerIdentity *opt; 1183 const struct GNUNET_PeerIdentity *opt;
1287 uint32_t bm; 1184 uint32_t bm;
@@ -1291,18 +1188,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1291 GNUNET_PEER_Id spid; 1188 GNUNET_PEER_Id spid;
1292 const struct GSF_PendingRequestData *prd; 1189 const struct GSF_PendingRequestData *prd;
1293 1190
1294 msize = ntohs (message->size); 1191 msize = ntohs (gm->header.size);
1295 if (msize < sizeof (struct GetMessage))
1296 {
1297 GNUNET_break_op (0);
1298 return NULL;
1299 }
1300 GNUNET_STATISTICS_update (GSF_stats,
1301 gettext_noop
1302 ("# GET requests received (from other peers)"),
1303 1,
1304 GNUNET_NO);
1305 gm = (const struct GetMessage *) message;
1306 tec.type = ntohl (gm->type); 1192 tec.type = ntohl (gm->type);
1307 bm = ntohl (gm->hash_bitmap); 1193 bm = ntohl (gm->hash_bitmap);
1308 bits = 0; 1194 bits = 0;
@@ -1312,32 +1198,16 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1312 bits++; 1198 bits++;
1313 bm >>= 1; 1199 bm >>= 1;
1314 } 1200 }
1315 if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
1316 {
1317 GNUNET_break_op (0);
1318 return NULL;
1319 }
1320 opt = (const struct GNUNET_PeerIdentity *) &gm[1]; 1201 opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1321 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity); 1202 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
1322 /* bfsize must be power of 2, check! */ 1203 GNUNET_STATISTICS_update (GSF_stats,
1323 if (0 != ((bfsize - 1) & bfsize)) 1204 gettext_noop
1324 { 1205 ("# GET requests received (from other peers)"),
1325 GNUNET_break_op (0); 1206 1,
1326 return NULL; 1207 GNUNET_NO);
1327 }
1328 GSF_cover_query_count++; 1208 GSF_cover_query_count++;
1329 bm = ntohl (gm->hash_bitmap); 1209 bm = ntohl (gm->hash_bitmap);
1330 bits = 0; 1210 bits = 0;
1331 cps = GSF_peer_get_ (other);
1332 if (NULL == cps)
1333 {
1334 /* peer must have just disconnected */
1335 GNUNET_STATISTICS_update (GSF_stats,
1336 gettext_noop
1337 ("# requests dropped due to initiator not being connected"),
1338 1, GNUNET_NO);
1339 return NULL;
1340 }
1341 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) 1211 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1342 cp = GSF_peer_get_ (&opt[bits++]); 1212 cp = GSF_peer_get_ (&opt[bits++]);
1343 else 1213 else
@@ -1352,24 +1222,24 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1352 else 1222 else
1353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1223 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1354 "Failed to find peer `%s' in connection set. Dropping query.\n", 1224 "Failed to find peer `%s' in connection set. Dropping query.\n",
1355 GNUNET_i2s (other)); 1225 GNUNET_i2s (cps->ppd.peer));
1356 GNUNET_STATISTICS_update (GSF_stats, 1226 GNUNET_STATISTICS_update (GSF_stats,
1357 gettext_noop 1227 gettext_noop
1358 ("# requests dropped due to missing reverse route"), 1228 ("# requests dropped due to missing reverse route"),
1359 1, 1229 1,
1360 GNUNET_NO); 1230 GNUNET_NO);
1361 return NULL; 1231 return;
1362 } 1232 }
1363 if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER) 1233 if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER)
1364 { 1234 {
1365 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1235 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366 "Peer `%s' has too many replies queued already. Dropping query.\n", 1236 "Peer `%s' has too many replies queued already. Dropping query.\n",
1367 GNUNET_i2s (other)); 1237 GNUNET_i2s (cps->ppd.peer));
1368 GNUNET_STATISTICS_update (GSF_stats, 1238 GNUNET_STATISTICS_update (GSF_stats,
1369 gettext_noop ("# requests dropped due to full reply queue"), 1239 gettext_noop ("# requests dropped due to full reply queue"),
1370 1, 1240 1,
1371 GNUNET_NO); 1241 GNUNET_NO);
1372 return NULL; 1242 return;
1373 } 1243 }
1374 /* note that we can really only check load here since otherwise 1244 /* note that we can really only check load here since otherwise
1375 * peers could find out that we are overloaded by not being 1245 * peers could find out that we are overloaded by not being
@@ -1380,14 +1250,14 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1380 { 1250 {
1381 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1251 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1382 "Dropping query from `%s', this peer is too busy.\n", 1252 "Dropping query from `%s', this peer is too busy.\n",
1383 GNUNET_i2s (other)); 1253 GNUNET_i2s (cps->ppd.peer));
1384 return NULL; 1254 return;
1385 } 1255 }
1386 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1256 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1387 "Received request for `%s' of type %u from peer `%s' with flags %u\n", 1257 "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1388 GNUNET_h2s (&gm->query), 1258 GNUNET_h2s (&gm->query),
1389 (unsigned int) tec.type, 1259 (unsigned int) tec.type,
1390 GNUNET_i2s (other), 1260 GNUNET_i2s (cps->ppd.peer),
1391 (unsigned int) bm); 1261 (unsigned int) bm);
1392 target = 1262 target =
1393 (0 != 1263 (0 !=
@@ -1403,7 +1273,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1403 * so at best indirect the query */ 1273 * so at best indirect the query */
1404 tec.priority = 0; 1274 tec.priority = 0;
1405 options |= GSF_PRO_FORWARD_ONLY; 1275 options |= GSF_PRO_FORWARD_ONLY;
1406 spid = GNUNET_PEER_intern (other); 1276 spid = GNUNET_PEER_intern (cps->ppd.peer);
1407 GNUNET_assert (0 != spid); 1277 GNUNET_assert (0 != spid);
1408 } 1278 }
1409 tec.ttl = bound_ttl (ntohl (gm->ttl), 1279 tec.ttl = bound_ttl (ntohl (gm->ttl),
@@ -1412,11 +1282,12 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1412 ttl_decrement = 1282 ttl_decrement =
1413 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1283 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1414 TTL_DECREMENT); 1284 TTL_DECREMENT);
1415 if ((tec.ttl < 0) && (((int32_t) (tec.ttl - ttl_decrement)) > 0)) 1285 if ( (tec.ttl < 0) &&
1286 (((int32_t) (tec.ttl - ttl_decrement)) > 0) )
1416 { 1287 {
1417 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1288 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1418 "Dropping query from `%s' due to TTL underflow (%d - %u).\n", 1289 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1419 GNUNET_i2s (other), 1290 GNUNET_i2s (cps->ppd.peer),
1420 tec.ttl, 1291 tec.ttl,
1421 ttl_decrement); 1292 ttl_decrement);
1422 GNUNET_STATISTICS_update (GSF_stats, 1293 GNUNET_STATISTICS_update (GSF_stats,
@@ -1424,7 +1295,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1424 ("# requests dropped due TTL underflow"), 1, 1295 ("# requests dropped due TTL underflow"), 1,
1425 GNUNET_NO); 1296 GNUNET_NO);
1426 /* integer underflow => drop (should be very rare)! */ 1297 /* integer underflow => drop (should be very rare)! */
1427 return NULL; 1298 return;
1428 } 1299 }
1429 tec.ttl -= ttl_decrement; 1300 tec.ttl -= ttl_decrement;
1430 1301
@@ -1435,7 +1306,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1435 &test_exist_cb, 1306 &test_exist_cb,
1436 &tec); 1307 &tec);
1437 if (GNUNET_YES == tec.finished) 1308 if (GNUNET_YES == tec.finished)
1438 return NULL; /* merged into existing request, we're done */ 1309 return; /* merged into existing request, we're done */
1439 1310
1440 peerreq = GNUNET_new (struct PeerRequest); 1311 peerreq = GNUNET_new (struct PeerRequest);
1441 peerreq->cp = cp; 1312 peerreq->cp = cp;
@@ -1452,7 +1323,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1452 (uint32_t) tec.priority, 1323 (uint32_t) tec.priority,
1453 tec.ttl, 1324 tec.ttl,
1454 spid, 1325 spid,
1455 GNUNET_PEER_intern (other), 1326 GNUNET_PEER_intern (cps->ppd.peer),
1456 NULL, 0, /* replies_seen */ 1327 NULL, 0, /* replies_seen */
1457 &handle_p2p_reply, 1328 &handle_p2p_reply,
1458 peerreq); 1329 peerreq);
@@ -1472,43 +1343,10 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1472 gettext_noop ("# P2P searches active"), 1343 gettext_noop ("# P2P searches active"),
1473 1, 1344 1,
1474 GNUNET_NO); 1345 GNUNET_NO);
1475 return pr; 1346 GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
1476} 1347 GSF_local_lookup_ (pr,
1477 1348 &GSF_consider_forwarding,
1478 1349 NULL);
1479/**
1480 * Function called if there has been a timeout trying to satisfy
1481 * a transmission request.
1482 *
1483 * @param cls the `struct GSF_PeerTransmitHandle` of the request
1484 */
1485static void
1486peer_transmit_timeout (void *cls)
1487{
1488 struct GSF_PeerTransmitHandle *pth = cls;
1489 struct GSF_ConnectedPeer *cp;
1490
1491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1492 "Timeout trying to transmit to other peer\n");
1493 pth->timeout_task = NULL;
1494 cp = pth->cp;
1495 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1496 cp->pth_tail,
1497 pth);
1498 if (GNUNET_YES == pth->is_query)
1499 GNUNET_assert (0 < cp->ppd.pending_queries--);
1500 else if (GNUNET_NO == pth->is_query)
1501 GNUNET_assert (0 < cp->ppd.pending_replies--);
1502 GNUNET_LOAD_update (cp->ppd.transmission_delay,
1503 UINT64_MAX);
1504 if (NULL != cp->cth)
1505 {
1506 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1507 cp->cth = NULL;
1508 }
1509 pth->gmc (pth->gmc_cls, 0, NULL);
1510 GNUNET_assert (0 == cp->cth_in_progress);
1511 GNUNET_free (pth);
1512} 1350}
1513 1351
1514 1352
@@ -1520,19 +1358,15 @@ peer_transmit_timeout (void *cls)
1520 * @param cp target peer 1358 * @param cp target peer
1521 * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR) 1359 * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1522 * @param priority how important is this request? 1360 * @param priority how important is this request?
1523 * @param timeout when does this request timeout (call gmc with error) 1361 * @param timeout when does this request timeout
1524 * @param size number of bytes we would like to send to the peer 1362 * @param size number of bytes we would like to send to the peer
1525 * @param gmc function to call to get the message 1363 * @param env message to send
1526 * @param gmc_cls closure for @a gmc
1527 * @return handle to cancel request
1528 */ 1364 */
1529struct GSF_PeerTransmitHandle * 1365void
1530GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, 1366GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1531 int is_query, 1367 int is_query,
1532 uint32_t priority, 1368 uint32_t priority,
1533 struct GNUNET_TIME_Relative timeout, 1369 struct GNUNET_MQ_Envelope *env)
1534 size_t size,
1535 GSF_GetMessageCallback gmc, void *gmc_cls)
1536{ 1370{
1537 struct GSF_PeerTransmitHandle *pth; 1371 struct GSF_PeerTransmitHandle *pth;
1538 struct GSF_PeerTransmitHandle *pos; 1372 struct GSF_PeerTransmitHandle *pos;
@@ -1540,10 +1374,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1540 1374
1541 pth = GNUNET_new (struct GSF_PeerTransmitHandle); 1375 pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1542 pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); 1376 pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1543 pth->timeout = GNUNET_TIME_relative_to_absolute (timeout); 1377 pth->env = env;
1544 pth->gmc = gmc;
1545 pth->gmc_cls = gmc_cls;
1546 pth->size = size;
1547 pth->is_query = is_query; 1378 pth->is_query = is_query;
1548 pth->priority = priority; 1379 pth->priority = priority;
1549 pth->cp = cp; 1380 pth->cp = cp;
@@ -1563,39 +1394,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1563 cp->ppd.pending_queries++; 1394 cp->ppd.pending_queries++;
1564 else if (GNUNET_NO == is_query) 1395 else if (GNUNET_NO == is_query)
1565 cp->ppd.pending_replies++; 1396 cp->ppd.pending_replies++;
1566 pth->timeout_task
1567 = GNUNET_SCHEDULER_add_delayed (timeout,
1568 &peer_transmit_timeout,
1569 pth);
1570 schedule_transmission (pth); 1397 schedule_transmission (pth);
1571 return pth;
1572}
1573
1574
1575/**
1576 * Cancel an earlier request for transmission.
1577 *
1578 * @param pth request to cancel
1579 */
1580void
1581GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1582{
1583 struct GSF_ConnectedPeer *cp;
1584
1585 if (NULL != pth->timeout_task)
1586 {
1587 GNUNET_SCHEDULER_cancel (pth->timeout_task);
1588 pth->timeout_task = NULL;
1589 }
1590 cp = pth->cp;
1591 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1592 cp->pth_tail,
1593 pth);
1594 if (GNUNET_YES == pth->is_query)
1595 GNUNET_assert (0 < cp->ppd.pending_queries--);
1596 else if (GNUNET_NO == pth->is_query)
1597 GNUNET_assert (0 < cp->ppd.pending_replies--);
1598 GNUNET_free (pth);
1599} 1398}
1600 1399
1601 1400
@@ -1683,7 +1482,9 @@ flush_respect (void *cls,
1683 GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect, 1482 GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1684 sizeof (cp->ppd.respect), 1483 sizeof (cp->ppd.respect),
1685 GNUNET_TIME_UNIT_FOREVER_ABS, 1484 GNUNET_TIME_UNIT_FOREVER_ABS,
1686 GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL); 1485 GNUNET_PEERSTORE_STOREOPTION_REPLACE,
1486 NULL,
1487 NULL);
1687 return GNUNET_OK; 1488 return GNUNET_OK;
1688} 1489}
1689 1490
@@ -1693,26 +1494,30 @@ flush_respect (void *cls,
1693 * record. 1494 * record.
1694 * 1495 *
1695 * @param cls unused 1496 * @param cls unused
1696 * @param peer identity of peer that connected 1497 * @param peer identity of peer that disconnected
1498 * @param internal_cls the corresponding `struct GSF_ConnectedPeer`
1697 */ 1499 */
1698void 1500void
1699GSF_peer_disconnect_handler_ (void *cls, 1501GSF_peer_disconnect_handler (void *cls,
1700 const struct GNUNET_PeerIdentity *peer) 1502 const struct GNUNET_PeerIdentity *peer,
1503 void *internal_cls)
1701{ 1504{
1702 struct GSF_ConnectedPeer *cp; 1505 struct GSF_ConnectedPeer *cp = internal_cls;
1703 struct GSF_PeerTransmitHandle *pth; 1506 struct GSF_PeerTransmitHandle *pth;
1704 struct GSF_DelayedHandle *dh; 1507 struct GSF_DelayedHandle *dh;
1705 1508
1706 cp = GSF_peer_get_ (peer);
1707 if (NULL == cp) 1509 if (NULL == cp)
1708 return; /* must have been disconnect from core with 1510 return; /* must have been disconnect from core with
1709 * 'peer' == my_id, ignore */ 1511 * 'peer' == my_id, ignore */
1710 flush_respect (NULL, peer, cp); 1512 flush_respect (NULL,
1513 peer,
1514 cp);
1711 GNUNET_assert (GNUNET_YES == 1515 GNUNET_assert (GNUNET_YES ==
1712 GNUNET_CONTAINER_multipeermap_remove (cp_map, 1516 GNUNET_CONTAINER_multipeermap_remove (cp_map,
1713 peer, 1517 peer,
1714 cp)); 1518 cp));
1715 GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"), 1519 GNUNET_STATISTICS_set (GSF_stats,
1520 gettext_noop ("# peers connected"),
1716 GNUNET_CONTAINER_multipeermap_size (cp_map), 1521 GNUNET_CONTAINER_multipeermap_size (cp_map),
1717 GNUNET_NO); 1522 GNUNET_NO);
1718 if (NULL != cp->respect_iterate_req) 1523 if (NULL != cp->respect_iterate_req)
@@ -1720,11 +1525,6 @@ GSF_peer_disconnect_handler_ (void *cls,
1720 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); 1525 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1721 cp->respect_iterate_req = NULL; 1526 cp->respect_iterate_req = NULL;
1722 } 1527 }
1723 if (NULL != cp->migration_pth)
1724 {
1725 GSF_peer_transmit_cancel_ (cp->migration_pth);
1726 cp->migration_pth = NULL;
1727 }
1728 if (NULL != cp->rc) 1528 if (NULL != cp->rc)
1729 { 1529 {
1730 GNUNET_ATS_reserve_bandwidth_cancel (cp->rc); 1530 GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
@@ -1748,19 +1548,8 @@ GSF_peer_disconnect_handler_ (void *cls,
1748 0, 1548 0,
1749 sizeof (cp->ppd.last_p2p_replies)); 1549 sizeof (cp->ppd.last_p2p_replies));
1750 GSF_push_stop_ (cp); 1550 GSF_push_stop_ (cp);
1751 if (NULL != cp->cth)
1752 {
1753 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1754 cp->cth = NULL;
1755 }
1756 GNUNET_assert (0 == cp->cth_in_progress);
1757 while (NULL != (pth = cp->pth_head)) 1551 while (NULL != (pth = cp->pth_head))
1758 { 1552 {
1759 if (pth->timeout_task != NULL)
1760 {
1761 GNUNET_SCHEDULER_cancel (pth->timeout_task);
1762 pth->timeout_task = NULL;
1763 }
1764 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 1553 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1765 cp->pth_tail, 1554 cp->pth_tail,
1766 pth); 1555 pth);
@@ -1768,7 +1557,6 @@ GSF_peer_disconnect_handler_ (void *cls,
1768 GNUNET_assert (0 < cp->ppd.pending_queries--); 1557 GNUNET_assert (0 < cp->ppd.pending_queries--);
1769 else if (GNUNET_NO == pth->is_query) 1558 else if (GNUNET_NO == pth->is_query)
1770 GNUNET_assert (0 < cp->ppd.pending_replies--); 1559 GNUNET_assert (0 < cp->ppd.pending_replies--);
1771 pth->gmc (pth->gmc_cls, 0, NULL);
1772 GNUNET_free (pth); 1560 GNUNET_free (pth);
1773 } 1561 }
1774 while (NULL != (dh = cp->delayed_head)) 1562 while (NULL != (dh = cp->delayed_head))
@@ -1776,9 +1564,9 @@ GSF_peer_disconnect_handler_ (void *cls,
1776 GNUNET_CONTAINER_DLL_remove (cp->delayed_head, 1564 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1777 cp->delayed_tail, 1565 cp->delayed_tail,
1778 dh); 1566 dh);
1567 GNUNET_MQ_discard (dh->env);
1779 cp->delay_queue_size--; 1568 cp->delay_queue_size--;
1780 GNUNET_SCHEDULER_cancel (dh->delay_task); 1569 GNUNET_SCHEDULER_cancel (dh->delay_task);
1781 GNUNET_free (dh->pm);
1782 GNUNET_free (dh); 1570 GNUNET_free (dh);
1783 } 1571 }
1784 GNUNET_PEER_change_rc (cp->ppd.pid, -1); 1572 GNUNET_PEER_change_rc (cp->ppd.pid, -1);
@@ -1883,40 +1671,6 @@ GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1883 1671
1884 1672
1885/** 1673/**
1886 * Assemble a migration stop message for transmission.
1887 *
1888 * @param cls the `struct GSF_ConnectedPeer` to use
1889 * @param size number of bytes we're allowed to write to @a buf
1890 * @param buf where to copy the message
1891 * @return number of bytes copied to @a buf
1892 */
1893static size_t
1894create_migration_stop_message (void *cls,
1895 size_t size,
1896 void *buf)
1897{
1898 struct GSF_ConnectedPeer *cp = cls;
1899 struct MigrationStopMessage msm;
1900
1901 cp->migration_pth = NULL;
1902 if (NULL == buf)
1903 return 0;
1904 GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1905 msm.header.size = htons (sizeof (struct MigrationStopMessage));
1906 msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1907 msm.reserved = htonl (0);
1908 msm.duration =
1909 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1910 (cp->last_migration_block));
1911 GNUNET_memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1912 GNUNET_STATISTICS_update (GSF_stats,
1913 gettext_noop ("# migration stop messages sent"),
1914 1, GNUNET_NO);
1915 return sizeof (struct MigrationStopMessage);
1916}
1917
1918
1919/**
1920 * Ask a peer to stop migrating data to us until the given point 1674 * Ask a peer to stop migrating data to us until the given point
1921 * in time. 1675 * in time.
1922 * 1676 *
@@ -1927,6 +1681,9 @@ void
1927GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, 1681GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1928 struct GNUNET_TIME_Absolute block_time) 1682 struct GNUNET_TIME_Absolute block_time)
1929{ 1683{
1684 struct GNUNET_MQ_Envelope *env;
1685 struct MigrationStopMessage *msm;
1686
1930 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us) 1687 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1931 { 1688 {
1932 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1689 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1939,13 +1696,20 @@ GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1939 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time), 1696 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
1940 GNUNET_YES)); 1697 GNUNET_YES));
1941 cp->last_migration_block = block_time; 1698 cp->last_migration_block = block_time;
1942 if (NULL != cp->migration_pth) 1699 env = GNUNET_MQ_msg (msm,
1943 GSF_peer_transmit_cancel_ (cp->migration_pth); 1700 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1944 cp->migration_pth = 1701 msm->reserved = htonl (0);
1945 GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX, 1702 msm->duration
1946 GNUNET_TIME_UNIT_FOREVER_REL, 1703 = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1947 sizeof (struct MigrationStopMessage), 1704 (cp->last_migration_block));
1948 &create_migration_stop_message, cp); 1705 GNUNET_STATISTICS_update (GSF_stats,
1706 gettext_noop ("# migration stop messages sent"),
1707 1,
1708 GNUNET_NO);
1709 GSF_peer_transmit_ (cp,
1710 GNUNET_SYSERR,
1711 UINT32_MAX,
1712 env);
1949} 1713}
1950 1714
1951 1715
@@ -1998,24 +1762,6 @@ GSF_connected_peer_init_ ()
1998 1762
1999 1763
2000/** 1764/**
2001 * Iterator to free peer entries.
2002 *
2003 * @param cls closure, unused
2004 * @param key current key code
2005 * @param value value in the hash map (peer entry)
2006 * @return #GNUNET_YES (we should continue to iterate)
2007 */
2008static int
2009clean_peer (void *cls,
2010 const struct GNUNET_PeerIdentity *key,
2011 void *value)
2012{
2013 GSF_peer_disconnect_handler_ (NULL, key);
2014 return GNUNET_YES;
2015}
2016
2017
2018/**
2019 * Shutdown peer management subsystem. 1765 * Shutdown peer management subsystem.
2020 */ 1766 */
2021void 1767void
@@ -2024,9 +1770,6 @@ GSF_connected_peer_done_ ()
2024 GNUNET_CONTAINER_multipeermap_iterate (cp_map, 1770 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
2025 &flush_respect, 1771 &flush_respect,
2026 NULL); 1772 NULL);
2027 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
2028 &clean_peer,
2029 NULL);
2030 GNUNET_SCHEDULER_cancel (fr_task); 1773 GNUNET_SCHEDULER_cancel (fr_task);
2031 fr_task = NULL; 1774 fr_task = NULL;
2032 GNUNET_CONTAINER_multipeermap_destroy (cp_map); 1775 GNUNET_CONTAINER_multipeermap_destroy (cp_map);
@@ -2072,7 +1815,8 @@ GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
2072{ 1815{
2073 if (NULL == cp_map) 1816 if (NULL == cp_map)
2074 return; /* already cleaned up */ 1817 return; /* already cleaned up */
2075 GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client, 1818 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1819 &clean_local_client,
2076 (void *) lc); 1820 (void *) lc);
2077} 1821}
2078 1822
diff --git a/src/fs/gnunet-service-fs_cp.h b/src/fs/gnunet-service-fs_cp.h
index 491ab34e3..3aba5c6a6 100644
--- a/src/fs/gnunet-service-fs_cp.h
+++ b/src/fs/gnunet-service-fs_cp.h
@@ -120,11 +120,16 @@ struct GSF_PeerPerformanceData
120 double avg_priority; 120 double avg_priority;
121 121
122 /** 122 /**
123 * The peer's identity. 123 * The peer's identity (interned version).
124 */ 124 */
125 GNUNET_PEER_Id pid; 125 GNUNET_PEER_Id pid;
126 126
127 /** 127 /**
128 * The peer's identity (pointer).
129 */
130 const struct GNUNET_PeerIdentity *peer;
131
132 /**
128 * Respect rating for this peer 133 * Respect rating for this peer
129 */ 134 */
130 uint32_t respect; 135 uint32_t respect;
@@ -185,17 +190,6 @@ typedef void
185 190
186 191
187/** 192/**
188 * Function called after the creation of a connected peer record is complete.
189 *
190 * @param cls closure
191 * @param cp handle to the newly created connected peer record
192 */
193typedef void
194(*GSF_ConnectedPeerCreationCallback) (void *cls,
195 struct GSF_ConnectedPeer *cp);
196
197
198/**
199 * Handle to cancel a transmission request. 193 * Handle to cancel a transmission request.
200 */ 194 */
201struct GSF_PeerTransmitHandle; 195struct GSF_PeerTransmitHandle;
@@ -205,14 +199,15 @@ struct GSF_PeerTransmitHandle;
205 * A peer connected to us. Setup the connected peer 199 * A peer connected to us. Setup the connected peer
206 * records. 200 * records.
207 * 201 *
202 * @param cls NULL
208 * @param peer identity of peer that connected 203 * @param peer identity of peer that connected
209 * @param creation_cb callback function when the record is created. 204 * @param mq queue for sending messages to @a peer
210 * @param creation_cb_cls closure for @creation_cb 205 * @return internal handle for the peer
211 */ 206 */
212void 207void *
213GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, 208GSF_peer_connect_handler (void *cls,
214 GSF_ConnectedPeerCreationCallback creation_cb, 209 const struct GNUNET_PeerIdentity *peer,
215 void *creation_cb_cls); 210 struct GNUNET_MQ_Handle *mq);
216 211
217 212
218/** 213/**
@@ -242,30 +237,15 @@ GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
242 * the callback is invoked with a 'NULL' buffer. 237 * the callback is invoked with a 'NULL' buffer.
243 * 238 *
244 * @param cp target peer 239 * @param cp target peer
245 * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) 240 * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO)
246 * @param priority how important is this request? 241 * @param priority how important is this request?
247 * @param timeout when does this request timeout (call gmc with error) 242 * @param env envelope of message to send
248 * @param size number of bytes we would like to send to the peer
249 * @param gmc function to call to get the message
250 * @param gmc_cls closure for gmc
251 * @return handle to cancel request
252 */ 243 */
253struct GSF_PeerTransmitHandle * 244void
254GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, 245GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
255 int is_query, 246 int is_query,
256 uint32_t priority, 247 uint32_t priority,
257 struct GNUNET_TIME_Relative timeout, 248 struct GNUNET_MQ_Envelope *env);
258 size_t size, GSF_GetMessageCallback gmc,
259 void *gmc_cls);
260
261
262/**
263 * Cancel an earlier request for transmission.
264 *
265 * @param pth request to cancel
266 */
267void
268GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth);
269 249
270 250
271/** 251/**
@@ -307,35 +287,25 @@ GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
307 287
308 288
309/** 289/**
310 * Handle P2P "MIGRATION_STOP" message. 290 * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
311 * 291 *
312 * @param cls closure, always NULL 292 * @param cls closure, the `struct GSF_ConnectedPeer`
313 * @param other the other peer involved (sender or receiver, NULL 293 * @param msm the actual message
314 * for loopback messages where we are both sender and receiver)
315 * @param message the actual message
316 * @return #GNUNET_OK to keep the connection open,
317 * #GNUNET_SYSERR to close it (signal serious error)
318 */ 294 */
319int 295void
320GSF_handle_p2p_migration_stop_ (void *cls, 296handle_p2p_migration_stop (void *cls,
321 const struct GNUNET_PeerIdentity *other, 297 const struct MigrationStopMessage *message);
322 const struct GNUNET_MessageHeader *message);
323 298
324 299
325/** 300/**
326 * Handle P2P "QUERY" message. Only responsible for creating the 301 * Handle P2P "QUERY" message.
327 * request entry itself and setting up reply callback and cancellation
328 * on peer disconnect. Does NOT execute the actual request strategy
329 * (planning) or local database operations.
330 * 302 *
331 * @param other the other peer involved (sender or receiver, NULL 303 * @param cls the `struct GSF_ConnectedPeer` of the other sender
332 * for loopback messages where we are both sender and receiver) 304 * @param gm the actual message
333 * @param message the actual message
334 * @return pending request handle, NULL on error
335 */ 305 */
336struct GSF_PendingRequest * 306void
337GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, 307handle_p2p_get (void *cls,
338 const struct GNUNET_MessageHeader *message); 308 const struct GetMessage *gm);
339 309
340 310
341/** 311/**
@@ -366,10 +336,12 @@ GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
366 * 336 *
367 * @param cls unused 337 * @param cls unused
368 * @param peer identity of peer that connected 338 * @param peer identity of peer that connected
339 * @param internal_cls our `struct GSF_ConnectedPeer` for @a peer
369 */ 340 */
370void 341void
371GSF_peer_disconnect_handler_ (void *cls, 342GSF_peer_disconnect_handler (void *cls,
372 const struct GNUNET_PeerIdentity *peer); 343 const struct GNUNET_PeerIdentity *peer,
344 void *internal_cls);
373 345
374 346
375/** 347/**
diff --git a/src/fs/gnunet-service-fs_pe.c b/src/fs/gnunet-service-fs_pe.c
index b338c1a13..098c3d180 100644
--- a/src/fs/gnunet-service-fs_pe.c
+++ b/src/fs/gnunet-service-fs_pe.c
@@ -189,11 +189,6 @@ struct PeerPlan
189 struct GNUNET_CONTAINER_MultiHashMap *plan_map; 189 struct GNUNET_CONTAINER_MultiHashMap *plan_map;
190 190
191 /** 191 /**
192 * Current transmission request handle.
193 */
194 struct GSF_PeerTransmitHandle *pth;
195
196 /**
197 * Peer for which this is the plan. 192 * Peer for which this is the plan.
198 */ 193 */
199 struct GSF_ConnectedPeer *cp; 194 struct GSF_ConnectedPeer *cp;
@@ -202,6 +197,12 @@ struct PeerPlan
202 * Current task for executing the plan. 197 * Current task for executing the plan.
203 */ 198 */
204 struct GNUNET_SCHEDULER_Task *task; 199 struct GNUNET_SCHEDULER_Task *task;
200
201 /**
202 * Current message under transmission for the plan.
203 */
204 struct GNUNET_MQ_Envelope *env;
205
205}; 206};
206 207
207 208
@@ -241,15 +242,6 @@ get_rp_key (struct GSF_RequestPlan *rp)
241 242
242 243
243/** 244/**
244 * Figure out when and how to transmit to the given peer.
245 *
246 * @param cls the `struct GSF_ConnectedPeer` for transmission
247 */
248static void
249schedule_peer_transmission (void *cls);
250
251
252/**
253 * Insert the given request plan into the heap with the appropriate weight. 245 * Insert the given request plan into the heap with the appropriate weight.
254 * 246 *
255 * @param pp associated peer's plan 247 * @param pp associated peer's plan
@@ -329,21 +321,22 @@ plan (struct PeerPlan *pp,
329 rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay); 321 rp->earliest_transmission = GNUNET_TIME_relative_to_absolute (delay);
330 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 322 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
331 "Earliest (re)transmission for `%s' in %us\n", 323 "Earliest (re)transmission for `%s' in %us\n",
332 GNUNET_h2s (&prd->query), rp->transmission_counter); 324 GNUNET_h2s (&prd->query),
325 rp->transmission_counter);
333 GNUNET_assert (rp->hn == NULL); 326 GNUNET_assert (rp->hn == NULL);
334 if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us) 327 if (0 == GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value_us)
335 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap, rp, rp->priority); 328 rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
329 rp,
330 rp->priority);
336 else 331 else
337 rp->hn = 332 rp->hn =
338 GNUNET_CONTAINER_heap_insert (pp->delay_heap, rp, 333 GNUNET_CONTAINER_heap_insert (pp->delay_heap,
334 rp,
339 rp->earliest_transmission.abs_value_us); 335 rp->earliest_transmission.abs_value_us);
340 GNUNET_assert (GNUNET_YES == 336 GNUNET_assert (GNUNET_YES ==
341 GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map, 337 GNUNET_CONTAINER_multihashmap_contains_value (pp->plan_map,
342 get_rp_key (rp), 338 get_rp_key (rp),
343 rp)); 339 rp));
344 if (NULL != pp->task)
345 GNUNET_SCHEDULER_cancel (pp->task);
346 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
347#undef N 340#undef N
348} 341}
349 342
@@ -383,75 +376,6 @@ get_latest (const struct GSF_RequestPlan *rp)
383 376
384 377
385/** 378/**
386 * Function called to get a message for transmission.
387 *
388 * @param cls closure
389 * @param buf_size number of bytes available in @a buf
390 * @param buf where to copy the message, NULL on error (peer disconnect)
391 * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
392 */
393static size_t
394transmit_message_callback (void *cls,
395 size_t buf_size,
396 void *buf)
397{
398 struct PeerPlan *pp = cls;
399 struct GSF_RequestPlan *rp;
400 size_t msize;
401
402 pp->pth = NULL;
403 if (NULL == buf)
404 {
405 /* failed, try again... */
406 if (NULL != pp->task)
407 GNUNET_SCHEDULER_cancel (pp->task);
408
409 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
410 GNUNET_STATISTICS_update (GSF_stats,
411 gettext_noop
412 ("# transmission failed (core has no bandwidth)"),
413 1, GNUNET_NO);
414 return 0;
415 }
416 rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap);
417 if (NULL == rp)
418 {
419 if (NULL != pp->task)
420 GNUNET_SCHEDULER_cancel (pp->task);
421 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
422 return 0;
423 }
424 msize = GSF_pending_request_get_message_ (get_latest (rp),
425 buf_size,
426 buf);
427 if (msize > buf_size)
428 {
429 if (NULL != pp->task)
430 GNUNET_SCHEDULER_cancel (pp->task);
431 /* buffer to small (message changed), try again */
432 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
433 return 0;
434 }
435 /* remove from root, add again elsewhere... */
436 GNUNET_assert (rp ==
437 GNUNET_CONTAINER_heap_remove_root (pp->priority_heap));
438 rp->hn = NULL;
439 rp->last_transmission = GNUNET_TIME_absolute_get ();
440 rp->transmission_counter++;
441 total_delay++;
442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443 "Executing plan %p executed %u times, planning retransmission\n",
444 rp, rp->transmission_counter);
445 plan (pp, rp);
446 GNUNET_STATISTICS_update (GSF_stats,
447 gettext_noop ("# query messages sent to other peers"),
448 1,
449 GNUNET_NO);
450 return msize;
451}
452
453
454/**
455 * Figure out when and how to transmit to the given peer. 379 * Figure out when and how to transmit to the given peer.
456 * 380 *
457 * @param cls the `struct PeerPlan` 381 * @param cls the `struct PeerPlan`
@@ -461,14 +385,16 @@ schedule_peer_transmission (void *cls)
461{ 385{
462 struct PeerPlan *pp = cls; 386 struct PeerPlan *pp = cls;
463 struct GSF_RequestPlan *rp; 387 struct GSF_RequestPlan *rp;
464 size_t msize;
465 struct GNUNET_TIME_Relative delay; 388 struct GNUNET_TIME_Relative delay;
466 389
467 pp->task = NULL; 390 if (NULL != pp->task)
468 if (NULL != pp->pth) 391 {
392 pp->task = NULL;
393 }
394 else
469 { 395 {
470 GSF_peer_transmit_cancel_ (pp->pth); 396 GNUNET_assert (NULL != pp->env);
471 pp->pth = NULL; 397 pp->env = NULL;
472 } 398 }
473 /* move ready requests to priority queue */ 399 /* move ready requests to priority queue */
474 while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) && 400 while ((NULL != (rp = GNUNET_CONTAINER_heap_peek (pp->delay_heap))) &&
@@ -508,23 +434,40 @@ schedule_peer_transmission (void *cls)
508 return; 434 return;
509 } 435 }
510#if INSANE_STATISTICS 436#if INSANE_STATISTICS
511 GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# query plans executed"), 437 GNUNET_STATISTICS_update (GSF_stats,
512 1, GNUNET_NO); 438 gettext_noop ("# query plans executed"),
439 1,
440 GNUNET_NO);
513#endif 441#endif
514 /* process from priority heap */ 442 /* process from priority heap */
515 rp = GNUNET_CONTAINER_heap_peek (pp->priority_heap); 443 rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap);
516 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 444 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
517 "Executing query plan %p\n", 445 "Executing query plan %p\n",
518 rp); 446 rp);
519 GNUNET_assert (NULL != rp); 447 GNUNET_assert (NULL != rp);
520 msize = GSF_pending_request_get_message_ (get_latest (rp), 0, NULL); 448 rp->hn = NULL;
521 pp->pth = 449 rp->last_transmission = GNUNET_TIME_absolute_get ();
522 GSF_peer_transmit_ (pp->cp, GNUNET_YES, 450 rp->transmission_counter++;
523 rp->priority, 451 total_delay++;
524 GNUNET_TIME_UNIT_FOREVER_REL, 452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525 msize, 453 "Executing plan %p executed %u times, planning retransmission\n",
526 &transmit_message_callback, pp); 454 rp,
527 GNUNET_assert (NULL != pp->pth); 455 rp->transmission_counter);
456 GNUNET_assert (NULL == pp->env);
457 pp->env = GSF_pending_request_get_message_ (get_latest (rp));
458 GNUNET_MQ_notify_sent (pp->env,
459 &schedule_peer_transmission,
460 pp);
461 GSF_peer_transmit_ (pp->cp,
462 GNUNET_YES,
463 rp->priority,
464 pp->env);
465 GNUNET_STATISTICS_update (GSF_stats,
466 gettext_noop ("# query messages sent to other peers"),
467 1,
468 GNUNET_NO);
469 plan (pp,
470 rp);
528} 471}
529 472
530 473
@@ -646,6 +589,8 @@ GSF_plan_add_ (struct GSF_ConnectedPeer *cp,
646 id, 589 id,
647 pp, 590 pp,
648 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 591 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
592 pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
593 pp);
649 } 594 }
650 mpc.merged = GNUNET_NO; 595 mpc.merged = GNUNET_NO;
651 mpc.pr = pr; 596 mpc.pr = pr;
@@ -710,16 +655,16 @@ GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
710 GNUNET_assert (GNUNET_YES == 655 GNUNET_assert (GNUNET_YES ==
711 GNUNET_CONTAINER_multipeermap_remove (plans, id, 656 GNUNET_CONTAINER_multipeermap_remove (plans, id,
712 pp)); 657 pp));
713 if (NULL != pp->pth)
714 {
715 GSF_peer_transmit_cancel_ (pp->pth);
716 pp->pth = NULL;
717 }
718 if (NULL != pp->task) 658 if (NULL != pp->task)
719 { 659 {
720 GNUNET_SCHEDULER_cancel (pp->task); 660 GNUNET_SCHEDULER_cancel (pp->task);
721 pp->task = NULL; 661 pp->task = NULL;
722 } 662 }
663 if (NULL != pp->env)
664 {
665 GNUNET_MQ_send_cancel (pp->env);
666 pp->env = NULL;
667 }
723 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap))) 668 while (NULL != (rp = GNUNET_CONTAINER_heap_remove_root (pp->priority_heap)))
724 { 669 {
725 GNUNET_break (GNUNET_YES == 670 GNUNET_break (GNUNET_YES ==
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index cd58992c1..f8a7b61f0 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -512,21 +512,17 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
512 512
513/** 513/**
514 * Generate the message corresponding to the given pending request for 514 * Generate the message corresponding to the given pending request for
515 * transmission to other peers (or at least determine its size). 515 * transmission to other peers.
516 * 516 *
517 * @param pr request to generate the message for 517 * @param pr request to generate the message for
518 * @param buf_size number of bytes available in @a buf 518 * @return envelope with the request message
519 * @param buf where to copy the message (can be NULL)
520 * @return number of bytes needed (if `>` @a buf_size) or used
521 */ 519 */
522size_t 520struct GNUNET_MQ_Envelope *
523GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, 521GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
524 size_t buf_size, void *buf)
525{ 522{
526 char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE]; 523 struct GNUNET_MQ_Envelope *env;
527 struct GetMessage *gm; 524 struct GetMessage *gm;
528 struct GNUNET_PeerIdentity *ext; 525 struct GNUNET_PeerIdentity *ext;
529 size_t msize;
530 unsigned int k; 526 unsigned int k;
531 uint32_t bm; 527 uint32_t bm;
532 uint32_t prio; 528 uint32_t prio;
@@ -535,11 +531,10 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
535 int64_t ttl; 531 int64_t ttl;
536 int do_route; 532 int do_route;
537 533
538 if (buf_size > 0) 534 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
539 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 535 "Building request message for `%s' of type %d\n",
540 "Building request message for `%s' of type %d\n", 536 GNUNET_h2s (&pr->public_data.query),
541 GNUNET_h2s (&pr->public_data.query), 537 pr->public_data.type);
542 pr->public_data.type);
543 k = 0; 538 k = 0;
544 bm = 0; 539 bm = 0;
545 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY)); 540 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
@@ -559,13 +554,9 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
559 k++; 554 k++;
560 } 555 }
561 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf); 556 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
562 msize = sizeof (struct GetMessage) + bf_size + k * sizeof (struct GNUNET_PeerIdentity); 557 env = GNUNET_MQ_msg_extra (gm,
563 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); 558 bf_size + k * sizeof (struct GNUNET_PeerIdentity),
564 if (buf_size < msize) 559 GNUNET_MESSAGE_TYPE_FS_GET);
565 return msize;
566 gm = (struct GetMessage *) lbuf;
567 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
568 gm->header.size = htons (msize);
569 gm->type = htonl (pr->public_data.type); 560 gm->type = htonl (pr->public_data.type);
570 if (do_route) 561 if (do_route)
571 prio = 562 prio =
@@ -585,7 +576,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
585 gm->query = pr->public_data.query; 576 gm->query = pr->public_data.query;
586 ext = (struct GNUNET_PeerIdentity *) &gm[1]; 577 ext = (struct GNUNET_PeerIdentity *) &gm[1];
587 k = 0; 578 k = 0;
588 if (!do_route) 579 if (! do_route)
589 GNUNET_PEER_resolve (pr->sender_pid, 580 GNUNET_PEER_resolve (pr->sender_pid,
590 &ext[k++]); 581 &ext[k++]);
591 if (NULL != pr->public_data.target) 582 if (NULL != pr->public_data.target)
@@ -595,8 +586,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
595 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, 586 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
596 (char *) &ext[k], 587 (char *) &ext[k],
597 bf_size)); 588 bf_size));
598 GNUNET_memcpy (buf, gm, msize); 589 return env;
599 return msize;
600} 590}
601 591
602 592
@@ -1699,18 +1689,14 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1699 * this content and possibly passes it on (to local clients or other 1689 * this content and possibly passes it on (to local clients or other
1700 * peers). Does NOT perform migration (content caching at this peer). 1690 * peers). Does NOT perform migration (content caching at this peer).
1701 * 1691 *
1702 * @param cp the other peer involved (sender or receiver, NULL 1692 * @param cls the other peer involved
1703 * for loopback messages where we are both sender and receiver) 1693 * @param put the actual message
1704 * @param message the actual message
1705 * @return #GNUNET_OK if the message was well-formed,
1706 * #GNUNET_SYSERR if the message was malformed (close connection,
1707 * do not cache under any circumstances)
1708 */ 1694 */
1709int 1695void
1710GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, 1696handle_p2p_put (void *cls,
1711 const struct GNUNET_MessageHeader *message) 1697 const struct PutMessage *put)
1712{ 1698{
1713 const struct PutMessage *put; 1699 struct GSF_ConnectedPeer *cp = cls;
1714 uint16_t msize; 1700 uint16_t msize;
1715 size_t dsize; 1701 size_t dsize;
1716 enum GNUNET_BLOCK_Type type; 1702 enum GNUNET_BLOCK_Type type;
@@ -1721,21 +1707,17 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1721 double putl; 1707 double putl;
1722 struct PutMigrationContext *pmc; 1708 struct PutMigrationContext *pmc;
1723 1709
1724 msize = ntohs (message->size); 1710 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1725 if (msize < sizeof (struct PutMessage)) 1711 "Received P2P PUT from %s\n",
1726 { 1712 GNUNET_i2s (GSF_get_peer_performance_data_ (cp)->peer));
1727 GNUNET_break_op (0); 1713 GSF_cover_content_count++;
1728 return GNUNET_SYSERR; 1714 msize = ntohs (put->header.size);
1729 }
1730 put = (const struct PutMessage *) message;
1731 dsize = msize - sizeof (struct PutMessage); 1715 dsize = msize - sizeof (struct PutMessage);
1732 type = ntohl (put->type); 1716 type = ntohl (put->type);
1733 expiration = GNUNET_TIME_absolute_ntoh (put->expiration); 1717 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
1734 /* do not allow migrated content to live longer than 1 year */ 1718 /* do not allow migrated content to live longer than 1 year */
1735 expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), 1719 expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
1736 expiration); 1720 expiration);
1737 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1738 return GNUNET_SYSERR;
1739 if (GNUNET_OK != 1721 if (GNUNET_OK !=
1740 GNUNET_BLOCK_get_key (GSF_block_ctx, 1722 GNUNET_BLOCK_get_key (GSF_block_ctx,
1741 type, 1723 type,
@@ -1744,7 +1726,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1744 &query)) 1726 &query))
1745 { 1727 {
1746 GNUNET_break_op (0); 1728 GNUNET_break_op (0);
1747 return GNUNET_SYSERR; 1729 return;
1748 } 1730 }
1749 GNUNET_STATISTICS_update (GSF_stats, 1731 GNUNET_STATISTICS_update (GSF_stats,
1750 gettext_noop ("# GAP PUT messages received"), 1732 gettext_noop ("# GAP PUT messages received"),
@@ -1786,11 +1768,19 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1786 GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid, 1768 GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
1787 &pmc->origin); 1769 &pmc->origin);
1788 if (NULL == 1770 if (NULL ==
1789 GNUNET_DATASTORE_put (GSF_dsh, 0, &query, dsize, &put[1], type, 1771 GNUNET_DATASTORE_put (GSF_dsh,
1790 prq.priority, 1 /* anonymity */ , 1772 0,
1773 &query,
1774 dsize,
1775 &put[1],
1776 type,
1777 prq.priority,
1778 1 /* anonymity */ ,
1791 0 /* replication */ , 1779 0 /* replication */ ,
1792 expiration, 1 + prq.priority, MAX_DATASTORE_QUEUE, 1780 expiration, 1 + prq.priority,
1793 &put_migration_continuation, pmc)) 1781 MAX_DATASTORE_QUEUE,
1782 &put_migration_continuation,
1783 pmc))
1794 { 1784 {
1795 put_migration_continuation (pmc, 1785 put_migration_continuation (pmc,
1796 GNUNET_SYSERR, 1786 GNUNET_SYSERR,
@@ -1802,7 +1792,8 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1802 { 1792 {
1803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1793 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1804 "Choosing not to keep content `%s' (%d/%d)\n", 1794 "Choosing not to keep content `%s' (%d/%d)\n",
1805 GNUNET_h2s (&query), active_to_migration, 1795 GNUNET_h2s (&query),
1796 active_to_migration,
1806 test_put_load_too_high (prq.priority)); 1797 test_put_load_too_high (prq.priority));
1807 } 1798 }
1808 putl = GNUNET_LOAD_get_load (datastore_put_load); 1799 putl = GNUNET_LOAD_get_load (datastore_put_load);
@@ -1826,9 +1817,9 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1826 putl, 1817 putl,
1827 active_to_migration, 1818 active_to_migration,
1828 (GNUNET_NO == prq.request_found)); 1819 (GNUNET_NO == prq.request_found));
1829 GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (block_time)); 1820 GSF_block_peer_migration_ (cp,
1821 GNUNET_TIME_relative_to_absolute (block_time));
1830 } 1822 }
1831 return GNUNET_OK;
1832} 1823}
1833 1824
1834 1825
diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h
index 2765f9b3d..fe4297414 100644
--- a/src/fs/gnunet-service-fs_pr.h
+++ b/src/fs/gnunet-service-fs_pr.h
@@ -40,34 +40,34 @@ enum GSF_PendingRequestOptions
40 */ 40 */
41 GSF_PRO_DEFAULTS = 0, 41 GSF_PRO_DEFAULTS = 0,
42 42
43 /** 43 /**
44 * Request must only be processed locally. 44 * Request must only be processed locally.
45 */ 45 */
46 GSF_PRO_LOCAL_ONLY = 1, 46 GSF_PRO_LOCAL_ONLY = 1,
47 47
48 /** 48 /**
49 * Request must only be forwarded (no routing) 49 * Request must only be forwarded (no routing)
50 */ 50 */
51 GSF_PRO_FORWARD_ONLY = 2, 51 GSF_PRO_FORWARD_ONLY = 2,
52 52
53 /** 53 /**
54 * Request persists indefinitely (no expiration). 54 * Request persists indefinitely (no expiration).
55 */ 55 */
56 GSF_PRO_REQUEST_NEVER_EXPIRES = 4, 56 GSF_PRO_REQUEST_NEVER_EXPIRES = 4,
57 57
58 /** 58 /**
59 * Request is allowed to refresh bloomfilter and change mingle value. 59 * Request is allowed to refresh bloomfilter and change mingle value.
60 */ 60 */
61 GSF_PRO_BLOOMFILTER_FULL_REFRESH = 8, 61 GSF_PRO_BLOOMFILTER_FULL_REFRESH = 8,
62 62
63 /** 63 /**
64 * Request priority is allowed to be exceeded. 64 * Request priority is allowed to be exceeded.
65 */ 65 */
66 GSF_PRO_PRIORITY_UNLIMITED = 16, 66 GSF_PRO_PRIORITY_UNLIMITED = 16,
67 67
68 /** 68 /**
69 * Option mask for typical local requests. 69 * Option mask for typical local requests.
70 */ 70 */
71 GSF_PRO_LOCAL_REQUEST = 71 GSF_PRO_LOCAL_REQUEST =
72 (GSF_PRO_BLOOMFILTER_FULL_REFRESH | GSF_PRO_PRIORITY_UNLIMITED | GSF_PRO_REQUEST_NEVER_EXPIRES) 72 (GSF_PRO_BLOOMFILTER_FULL_REFRESH | GSF_PRO_PRIORITY_UNLIMITED | GSF_PRO_REQUEST_NEVER_EXPIRES)
73}; 73};
@@ -288,17 +288,13 @@ GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
288 288
289/** 289/**
290 * Generate the message corresponding to the given pending request for 290 * Generate the message corresponding to the given pending request for
291 * transmission to other peers (or at least determine its size). 291 * transmission to other peers.
292 * 292 *
293 * @param pr request to generate the message for 293 * @param pr request to generate the message for
294 * @param buf_size number of bytes available in @a buf 294 * @return envelope with the request message
295 * @param buf where to copy the message (can be NULL)
296 * @return number of bytes needed (if @a buf_size too small) or used
297 */ 295 */
298size_t 296struct GNUNET_MQ_Envelope *
299GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, 297GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr);
300 size_t buf_size,
301 void *buf);
302 298
303 299
304/** 300/**
@@ -344,16 +340,12 @@ GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it,
344 * this content and possibly passes it on (to local clients or other 340 * this content and possibly passes it on (to local clients or other
345 * peers). Does NOT perform migration (content caching at this peer). 341 * peers). Does NOT perform migration (content caching at this peer).
346 * 342 *
347 * @param cp the other peer involved (sender or receiver, NULL 343 * @param cls the other peer involved (sender)
348 * for loopback messages where we are both sender and receiver) 344 * @param put the actual message
349 * @param message the actual message
350 * @return #GNUNET_OK if the message was well-formed,
351 * #GNUNET_SYSERR if the message was malformed (close connection,
352 * do not cache under any circumstances)
353 */ 345 */
354int 346void
355GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, 347handle_p2p_put (void *cls,
356 const struct GNUNET_MessageHeader *message); 348 const struct PutMessage *put);
357 349
358 350
359/** 351/**
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c
index 59f3772f5..1573bc160 100644
--- a/src/fs/gnunet-service-fs_push.c
+++ b/src/fs/gnunet-service-fs_push.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2011 GNUnet e.V. 3 Copyright (C) 2011, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -102,8 +102,7 @@ struct MigrationReadyBlock
102 102
103 103
104/** 104/**
105 * Information about a peer waiting for 105 * Information about a peer waiting for migratable data.
106 * migratable data.
107 */ 106 */
108struct MigrationReadyPeer 107struct MigrationReadyPeer
109{ 108{
@@ -123,15 +122,9 @@ struct MigrationReadyPeer
123 struct GSF_ConnectedPeer *peer; 122 struct GSF_ConnectedPeer *peer;
124 123
125 /** 124 /**
126 * Handle for current transmission request, 125 * Envelope of the currently pushed message.
127 * or NULL for none.
128 */ 126 */
129 struct GSF_PeerTransmitHandle *th; 127 struct GNUNET_MQ_Envelope *env;
130
131 /**
132 * Message we are trying to push right now (or NULL)
133 */
134 struct PutMessage *msg;
135}; 128};
136 129
137 130
@@ -163,7 +156,7 @@ static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
163/** 156/**
164 * ID of task that collects blocks for migration. 157 * ID of task that collects blocks for migration.
165 */ 158 */
166static struct GNUNET_SCHEDULER_Task * mig_task; 159static struct GNUNET_SCHEDULER_Task *mig_task;
167 160
168/** 161/**
169 * What is the maximum frequency at which we are allowed to 162 * What is the maximum frequency at which we are allowed to
@@ -195,8 +188,11 @@ static int value_found;
195static void 188static void
196delete_migration_block (struct MigrationReadyBlock *mb) 189delete_migration_block (struct MigrationReadyBlock *mb)
197{ 190{
198 GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb); 191 GNUNET_CONTAINER_DLL_remove (mig_head,
199 GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE); 192 mig_tail,
193 mb);
194 GNUNET_PEER_decrement_rcs (mb->target_list,
195 MIGRATION_LIST_SIZE);
200 mig_size--; 196 mig_size--;
201 GNUNET_free (mb); 197 GNUNET_free (mb);
202} 198}
@@ -204,49 +200,11 @@ delete_migration_block (struct MigrationReadyBlock *mb)
204 200
205/** 201/**
206 * Find content for migration to this peer. 202 * Find content for migration to this peer.
207 */
208static void
209find_content (struct MigrationReadyPeer *mrp);
210
211
212/**
213 * Transmit the message currently scheduled for transmission.
214 * 203 *
215 * @param cls the `struct MigrationReadyPeer` 204 * @param cls a `struct MigrationReadyPeer *`
216 * @param buf_size number of bytes available in @a buf
217 * @param buf where to copy the message, NULL on error (peer disconnect)
218 * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
219 */ 205 */
220static size_t 206static void
221transmit_message (void *cls, 207find_content (void *cls);
222 size_t buf_size,
223 void *buf)
224{
225 struct MigrationReadyPeer *peer = cls;
226 struct PutMessage *msg;
227 uint16_t msize;
228
229 peer->th = NULL;
230 msg = peer->msg;
231 peer->msg = NULL;
232 if (NULL == buf)
233 {
234 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
235 "Failed to migrate content to another peer (disconnect)\n");
236 GNUNET_free (msg);
237 return 0;
238 }
239 msize = ntohs (msg->header.size);
240 GNUNET_assert (msize <= buf_size);
241 GNUNET_memcpy (buf, msg, msize);
242 GNUNET_free (msg);
243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244 "Pushing %u bytes to %s\n",
245 msize,
246 GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer)));
247 find_content (peer);
248 return msize;
249}
250 208
251 209
252/** 210/**
@@ -257,31 +215,30 @@ transmit_message (void *cls,
257 * @return #GNUNET_YES if the block was deleted (!) 215 * @return #GNUNET_YES if the block was deleted (!)
258 */ 216 */
259static int 217static int
260transmit_content (struct MigrationReadyPeer *peer, 218transmit_content (struct MigrationReadyPeer *mrp,
261 struct MigrationReadyBlock *block) 219 struct MigrationReadyBlock *block)
262{ 220{
263 size_t msize;
264 struct PutMessage *msg; 221 struct PutMessage *msg;
265 unsigned int i; 222 unsigned int i;
266 struct GSF_PeerPerformanceData *ppd; 223 struct GSF_PeerPerformanceData *ppd;
267 int ret; 224 int ret;
268 225
269 ppd = GSF_get_peer_performance_data_ (peer->peer); 226 ppd = GSF_get_peer_performance_data_ (mrp->peer);
270 GNUNET_assert (NULL == peer->th); 227 mrp->env = GNUNET_MQ_msg_extra (msg,
271 msize = sizeof (struct PutMessage) + block->size; 228 block->size,
272 msg = GNUNET_malloc (msize); 229 GNUNET_MESSAGE_TYPE_FS_PUT);
273 msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
274 msg->header.size = htons (msize);
275 msg->type = htonl (block->type); 230 msg->type = htonl (block->type);
276 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration); 231 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
277 GNUNET_memcpy (&msg[1], &block[1], block->size); 232 GNUNET_memcpy (&msg[1],
278 peer->msg = msg; 233 &block[1],
234 block->size);
279 for (i = 0; i < MIGRATION_LIST_SIZE; i++) 235 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
280 { 236 {
281 if (block->target_list[i] == 0) 237 if (block->target_list[i] == 0)
282 { 238 {
283 block->target_list[i] = ppd->pid; 239 block->target_list[i] = ppd->pid;
284 GNUNET_PEER_change_rc (block->target_list[i], 1); 240 GNUNET_PEER_change_rc (block->target_list[i],
241 1);
285 break; 242 break;
286 } 243 }
287 } 244 }
@@ -294,15 +251,13 @@ transmit_content (struct MigrationReadyPeer *peer,
294 { 251 {
295 ret = GNUNET_NO; 252 ret = GNUNET_NO;
296 } 253 }
297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 254 GNUNET_MQ_notify_sent (mrp->env,
298 "Asking for transmission of %u bytes to %s for migration\n", 255 &find_content,
299 (unsigned int) msize, 256 mrp);
300 GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); 257 GSF_peer_transmit_ (mrp->peer,
301 peer->th = GSF_peer_transmit_ (peer->peer, 258 GNUNET_NO,
302 GNUNET_NO, 0 /* priority */ , 259 0 /* priority */ ,
303 GNUNET_TIME_UNIT_FOREVER_REL, 260 mrp->env);
304 msize,
305 &transmit_message, peer);
306 return ret; 261 return ret;
307} 262}
308 263
@@ -330,12 +285,12 @@ count_targets (struct MigrationReadyBlock *block)
330 * Check if sending this block to this peer would 285 * Check if sending this block to this peer would
331 * be a good idea. 286 * be a good idea.
332 * 287 *
333 * @param peer target peer 288 * @param mrp target peer
334 * @param block the block 289 * @param block the block
335 * @return score (>= 0: feasible, negative: infeasible) 290 * @return score (>= 0: feasible, negative: infeasible)
336 */ 291 */
337static long 292static long
338score_content (struct MigrationReadyPeer *peer, 293score_content (struct MigrationReadyPeer *mrp,
339 struct MigrationReadyBlock *block) 294 struct MigrationReadyBlock *block)
340{ 295{
341 unsigned int i; 296 unsigned int i;
@@ -344,14 +299,18 @@ score_content (struct MigrationReadyPeer *peer,
344 struct GNUNET_HashCode hc; 299 struct GNUNET_HashCode hc;
345 uint32_t dist; 300 uint32_t dist;
346 301
347 ppd = GSF_get_peer_performance_data_ (peer->peer); 302 ppd = GSF_get_peer_performance_data_ (mrp->peer);
348 for (i = 0; i < MIGRATION_LIST_SIZE; i++) 303 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
349 if (block->target_list[i] == ppd->pid) 304 if (block->target_list[i] == ppd->pid)
350 return -1; 305 return -1;
351 GNUNET_assert (0 != ppd->pid); 306 GNUNET_assert (0 != ppd->pid);
352 GNUNET_PEER_resolve (ppd->pid, &id); 307 GNUNET_PEER_resolve (ppd->pid,
353 GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc); 308 &id);
354 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc); 309 GNUNET_CRYPTO_hash (&id,
310 sizeof (struct GNUNET_PeerIdentity),
311 &hc);
312 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
313 &hc);
355 /* closer distance, higher score: */ 314 /* closer distance, higher score: */
356 return UINT32_MAX - dist; 315 return UINT32_MAX - dist;
357} 316}
@@ -368,17 +327,18 @@ consider_gathering (void);
368/** 327/**
369 * Find content for migration to this peer. 328 * Find content for migration to this peer.
370 * 329 *
371 * @param mrp peer to find content for 330 * @param cls peer to find content for
372 */ 331 */
373static void 332static void
374find_content (struct MigrationReadyPeer *mrp) 333find_content (void *cls)
375{ 334{
335 struct MigrationReadyPeer *mrp = cls;
376 struct MigrationReadyBlock *pos; 336 struct MigrationReadyBlock *pos;
377 long score; 337 long score;
378 long best_score; 338 long best_score;
379 struct MigrationReadyBlock *best; 339 struct MigrationReadyBlock *best;
380 340
381 GNUNET_assert (NULL == mrp->th); 341 mrp->env = NULL;
382 best = NULL; 342 best = NULL;
383 best_score = -1; 343 best_score = -1;
384 pos = mig_head; 344 pos = mig_head;
@@ -423,7 +383,8 @@ find_content (struct MigrationReadyPeer *mrp)
423 } 383 }
424 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 384 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425 "Preparing to push best content to peer\n"); 385 "Preparing to push best content to peer\n");
426 transmit_content (mrp, best); 386 transmit_content (mrp,
387 best);
427} 388}
428 389
429 390
@@ -454,9 +415,12 @@ consider_gathering ()
454 return; 415 return;
455 if (mig_size >= MAX_MIGRATION_QUEUE) 416 if (mig_size >= MAX_MIGRATION_QUEUE)
456 return; 417 return;
457 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); 418 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
458 delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE); 419 mig_size);
459 delay = GNUNET_TIME_relative_max (delay, min_migration_delay); 420 delay = GNUNET_TIME_relative_divide (delay,
421 MAX_MIGRATION_QUEUE);
422 delay = GNUNET_TIME_relative_max (delay,
423 min_migration_delay);
460 if (GNUNET_NO == value_found) 424 if (GNUNET_NO == value_found)
461 { 425 {
462 /* wait at least 5s if the datastore is empty */ 426 /* wait at least 5s if the datastore is empty */
@@ -467,8 +431,9 @@ consider_gathering ()
467 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 431 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
468 "Scheduling gathering task (queue size: %u)\n", 432 "Scheduling gathering task (queue size: %u)\n",
469 mig_size); 433 mig_size);
470 mig_task = 434 mig_task = GNUNET_SCHEDULER_add_delayed (delay,
471 GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL); 435 &gather_migration_blocks,
436 NULL);
472} 437}
473 438
474 439
@@ -549,14 +514,12 @@ process_migration_content (void *cls,
549 mig_size++; 514 mig_size++;
550 for (pos = peer_head; NULL != pos; pos = pos->next) 515 for (pos = peer_head; NULL != pos; pos = pos->next)
551 { 516 {
552 if (NULL == pos->th) 517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553 { 518 "Preparing to push best content to peer %s\n",
554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 519 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer)));
555 "Preparing to push best content to peer %s\n", 520 if (GNUNET_YES == transmit_content (pos,
556 GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer))); 521 mb))
557 if (GNUNET_YES == transmit_content (pos, mb)) 522 break; /* 'mb' was freed! */
558 break; /* 'mb' was freed! */
559 }
560 } 523 }
561 consider_gathering (); 524 consider_gathering ();
562} 525}
@@ -580,9 +543,11 @@ gather_migration_blocks (void *cls)
580 "Asking datastore for content for replication (queue size: %u)\n", 543 "Asking datastore for content for replication (queue size: %u)\n",
581 mig_size); 544 mig_size);
582 value_found = GNUNET_NO; 545 value_found = GNUNET_NO;
583 mig_qe = 546 mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
584 GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, 547 0,
585 &process_migration_content, NULL); 548 UINT_MAX,
549 &process_migration_content,
550 NULL);
586 if (NULL == mig_qe) 551 if (NULL == mig_qe)
587 consider_gathering (); 552 consider_gathering ();
588} 553}
@@ -640,19 +605,11 @@ GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
640 break; 605 break;
641 if (NULL == pos) 606 if (NULL == pos)
642 return; 607 return;
608 if (NULL != pos->env)
609 GNUNET_MQ_send_cancel (pos->env);
643 GNUNET_CONTAINER_DLL_remove (peer_head, 610 GNUNET_CONTAINER_DLL_remove (peer_head,
644 peer_tail, 611 peer_tail,
645 pos); 612 pos);
646 if (NULL != pos->th)
647 {
648 GSF_peer_transmit_cancel_ (pos->th);
649 pos->th = NULL;
650 }
651 if (NULL != pos->msg)
652 {
653 GNUNET_free (pos->msg);
654 pos->msg = NULL;
655 }
656 GNUNET_free (pos); 613 GNUNET_free (pos);
657} 614}
658 615
@@ -664,16 +621,21 @@ void
664GSF_push_init_ () 621GSF_push_init_ ()
665{ 622{
666 enabled = 623 enabled =
667 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING"); 624 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
625 "FS",
626 "CONTENT_PUSHING");
668 if (GNUNET_YES != enabled) 627 if (GNUNET_YES != enabled)
669 return; 628 return;
670 629
671 if (GNUNET_OK != 630 if (GNUNET_OK !=
672 GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY", 631 GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
632 "fs",
633 "MIN_MIGRATION_DELAY",
673 &min_migration_delay)) 634 &min_migration_delay))
674 { 635 {
675 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING, 636 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
676 "fs", "MIN_MIGRATION_DELAY", 637 "fs",
638 "MIN_MIGRATION_DELAY",
677 _("time required, content pushing disabled")); 639 _("time required, content pushing disabled"));
678 return; 640 return;
679 } 641 }