aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/gnunet-service-fs.h30
-rw-r--r--src/fs/gnunet-service-fs_cp.c446
-rw-r--r--src/fs/gnunet-service-fs_cp.h27
-rw-r--r--src/fs/gnunet-service-fs_lc.c64
-rw-r--r--src/fs/gnunet-service-fs_pr.c807
-rw-r--r--src/fs/gnunet-service-fs_pr.h62
-rw-r--r--src/fs/gnunet-service-fs_push.c475
-rw-r--r--src/fs/gnunet-service-fs_put.c197
8 files changed, 2030 insertions, 78 deletions
diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h
index ca8d4cdc5..52579188d 100644
--- a/src/fs/gnunet-service-fs.h
+++ b/src/fs/gnunet-service-fs.h
@@ -31,18 +31,44 @@
31 */ 31 */
32struct GSF_ConnectedPeer; 32struct GSF_ConnectedPeer;
33 33
34
35/** 34/**
36 * An active request. 35 * An active request.
37 */ 36 */
38struct GSF_PendingRequest; 37struct GSF_PendingRequest;
39 38
40
41/** 39/**
42 * A local client. 40 * A local client.
43 */ 41 */
44struct GSF_LocalClient; 42struct GSF_LocalClient;
45 43
46 44
45/**
46 * Our connection to the datastore.
47 */
48extern struct GNUNET_DATASTORE_Handle *GSF_dsh;
49
50/**
51 * Our configuration.
52 */
53extern const struct GNUNET_CONFIGURATION_Handle *GSF_cfg;
54
55/**
56 * Handle for reporting statistics.
57 */
58extern struct GNUNET_STATISTICS_Handle *GSF_stats;
59
60/**
61 * Pointer to handle to the core service (points to NULL until we've
62 * connected to it).
63 */
64extern struct GNUNET_CORE_Handle *GSF_core;
65
66/**
67 * Handle for DHT operations.
68 */
69static struct GNUNET_DHT_Handle *GSF_dht;
70
71
72
47#endif 73#endif
48/* end of gnunet-service-fs.h */ 74/* end of gnunet-service-fs.h */
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 2361cd4fc..903549cb7 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -141,11 +141,6 @@ struct GSF_ConnectedPeer
141 uint64_t inc_preference; 141 uint64_t inc_preference;
142 142
143 /** 143 /**
144 * Trust rating for this peer
145 */
146 uint32_t trust;
147
148 /**
149 * Trust rating for this peer on disk. 144 * Trust rating for this peer on disk.
150 */ 145 */
151 uint32_t disk_trust; 146 uint32_t disk_trust;
@@ -249,6 +244,19 @@ update_atsi (struct GSF_ConnectedPeer *cp,
249 244
250 245
251/** 246/**
247 * Return the performance data record for the given peer
248 *
249 * @param cp peer to query
250 * @return performance data record for the peer
251 */
252struct GSF_PeerPerformanceData *
253GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
254{
255 return &cp->ppd;
256}
257
258
259/**
252 * Core is ready to transmit to a peer, get the message. 260 * Core is ready to transmit to a peer, get the message.
253 * 261 *
254 * @param cls the 'struct GSF_PeerTransmitHandle' of the message 262 * @param cls the 'struct GSF_PeerTransmitHandle' of the message
@@ -420,13 +428,95 @@ GSF_handle_p2p_migration_stop_ (void *cls,
420 GNUNET_break (0); 428 GNUNET_break (0);
421 return GNUNET_OK; 429 return GNUNET_OK;
422 } 430 }
423 cp->ppd.migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration)); 431 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
424 update_atsi (cp, atsi); 432 update_atsi (cp, atsi);
425 return GNUNET_OK; 433 return GNUNET_OK;
426} 434}
427 435
428 436
429/** 437/**
438 * Handle a reply to a pending request. Also called if a request
439 * expires (then with data == NULL). The handler may be called
440 * many times (depending on the request type), but will not be
441 * called during or after a call to GSF_pending_request_cancel
442 * and will also not be called anymore after a call signalling
443 * expiration.
444 *
445 * @param cls user-specified closure
446 * @param pr handle to the original pending request
447 * @param data response data, NULL on request expiration
448 * @param data_len number of bytes in data
449 */
450static void
451handle_p2p_reply (void *cls,
452 struct GSF_PendingRequest *pr,
453 const void *data,
454 size_t data_len)
455{
456#if SUPPORT_DELAYS
457 struct GNUNET_TIME_Relative art_delay;
458#endif
459
460 /* FIXME: adapt code fragments below to new API! */
461
462
463 /* reply will go over the network, check for cover traffic */
464 if ( (prq->anonymity_level > 1) &&
465 (cover_content_count < prq->anonymity_level - 1) )
466 {
467 /* insufficient cover traffic, skip */
468 GNUNET_STATISTICS_update (stats,
469 gettext_noop ("# replies suppressed due to lack of cover traffic"),
470 1,
471 GNUNET_NO);
472 return GNUNET_YES;
473 }
474 if (prq->anonymity_level > 1)
475 cover_content_count -= prq->anonymity_level - 1;
476
477
478 cp = pr->cp;
479#if DEBUG_FS
480 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
481 "Transmitting result for query `%s' to other peer (PID=%u)\n",
482 GNUNET_h2s (key),
483 (unsigned int) cp->pid);
484#endif
485 GNUNET_STATISTICS_update (stats,
486 gettext_noop ("# replies received for other peers"),
487 1,
488 GNUNET_NO);
489 msize = sizeof (struct PutMessage) + prq->size;
490 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
491 reply->cont = &transmit_reply_continuation;
492 reply->cont_cls = pr;
493#if SUPPORT_DELAYS
494 art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
495 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
496 TTL_DECREMENT));
497 reply->delay_until
498 = GNUNET_TIME_relative_to_absolute (art_delay);
499 GNUNET_STATISTICS_update (stats,
500 gettext_noop ("cummulative artificial delay introduced (ms)"),
501 art_delay.abs_value,
502 GNUNET_NO);
503#endif
504 reply->msize = msize;
505 reply->priority = UINT32_MAX; /* send replies first! */
506 pm = (struct PutMessage*) &reply[1];
507 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
508 pm->header.size = htons (msize);
509 pm->type = htonl (prq->type);
510 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
511 memcpy (&pm[1], prq->data, prq->size);
512 add_to_pending_messages_for_peer (cp, reply, pr);
513
514
515}
516
517
518
519/**
430 * Handle P2P "QUERY" message. 520 * Handle P2P "QUERY" message.
431 * 521 *
432 * @param other the other peer involved (sender or receiver, NULL 522 * @param other the other peer involved (sender or receiver, NULL
@@ -438,9 +528,310 @@ struct GSF_PendingRequest *
438GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, 528GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
439 const struct GNUNET_MessageHeader *message) 529 const struct GNUNET_MessageHeader *message)
440{ 530{
531 /* FIXME: adapt old code to new API! */
532 struct PendingRequest *pr;
533 struct ConnectedPeer *cp;
534 struct ConnectedPeer *cps;
535 struct CheckDuplicateRequestClosure cdc;
536 struct GNUNET_TIME_Relative timeout;
537 uint16_t msize;
538 const struct GetMessage *gm;
539 unsigned int bits;
540 const GNUNET_HashCode *opt;
541 uint32_t bm;
542 size_t bfsize;
543 uint32_t ttl_decrement;
544 int32_t priority;
545 enum GNUNET_BLOCK_Type type;
546 int have_ns;
547
548 msize = ntohs(message->size);
549 if (msize < sizeof (struct GetMessage))
550 {
551 GNUNET_break_op (0);
552 return GNUNET_SYSERR;
553 }
554 gm = (const struct GetMessage*) message;
555#if DEBUG_FS
556 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
557 "Received request for `%s'\n",
558 GNUNET_h2s (&gm->query));
559#endif
560 type = ntohl (gm->type);
561 bm = ntohl (gm->hash_bitmap);
562 bits = 0;
563 while (bm > 0)
564 {
565 if (1 == (bm & 1))
566 bits++;
567 bm >>= 1;
568 }
569 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
570 {
571 GNUNET_break_op (0);
572 return GNUNET_SYSERR;
573 }
574 opt = (const GNUNET_HashCode*) &gm[1];
575 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
576 /* bfsize must be power of 2, check! */
577 if (0 != ( (bfsize - 1) & bfsize))
578 {
579 GNUNET_break_op (0);
580 return GNUNET_SYSERR;
581 }
582 cover_query_count++;
583 bm = ntohl (gm->hash_bitmap);
584 bits = 0;
585 cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
586 &other->hashPubKey);
587 if (NULL == cps)
588 {
589 /* peer must have just disconnected */
590 GNUNET_STATISTICS_update (stats,
591 gettext_noop ("# requests dropped due to initiator not being connected"),
592 1,
593 GNUNET_NO);
594 return GNUNET_SYSERR;
595 }
596 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
597 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
598 &opt[bits++]);
599 else
600 cp = cps;
601 if (cp == NULL)
602 {
603#if DEBUG_FS
604 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
605 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606 "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
607 GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
608
609 else
610 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
611 "Failed to find peer `%4s' in connection set. Dropping query.\n",
612 GNUNET_i2s (other));
613#endif
614 GNUNET_STATISTICS_update (stats,
615 gettext_noop ("# requests dropped due to missing reverse route"),
616 1,
617 GNUNET_NO);
618 /* FIXME: try connect? */
619 return GNUNET_OK;
620 }
621 /* note that we can really only check load here since otherwise
622 peers could find out that we are overloaded by not being
623 disconnected after sending us a malformed query... */
624 priority = bound_priority (ntohl (gm->priority), cps);
625 if (priority < 0)
626 {
627#if DEBUG_FS
628 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629 "Dropping query from `%s', this peer is too busy.\n",
630 GNUNET_i2s (other));
631#endif
632 return GNUNET_OK;
633 }
634#if DEBUG_FS
635 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636 "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
637 GNUNET_h2s (&gm->query),
638 (unsigned int) type,
639 GNUNET_i2s (other),
640 (unsigned int) bm);
641#endif
642 have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
643 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
644 (have_ns ? sizeof(GNUNET_HashCode) : 0));
645 if (have_ns)
646 {
647 pr->namespace = (GNUNET_HashCode*) &pr[1];
648 memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
649 }
650 if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
651 (GNUNET_LOAD_get_average (cp->transmission_delay) >
652 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
653 {
654 /* don't have BW to send to peer, or would likely take longer than we have for it,
655 so at best indirect the query */
656 priority = 0;
657 pr->forward_only = GNUNET_YES;
658 }
659 pr->type = type;
660 pr->mingle = ntohl (gm->filter_mutator);
661 if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
662 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
663 pr->anonymity_level = 1;
664 pr->priority = (uint32_t) priority;
665 pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
666 pr->query = gm->query;
667 /* decrement ttl (always) */
668 ttl_decrement = 2 * TTL_DECREMENT +
669 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
670 TTL_DECREMENT);
671 if ( (pr->ttl < 0) &&
672 (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
673 {
674#if DEBUG_FS
675 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
677 GNUNET_i2s (other),
678 pr->ttl,
679 ttl_decrement);
680#endif
681 GNUNET_STATISTICS_update (stats,
682 gettext_noop ("# requests dropped due TTL underflow"),
683 1,
684 GNUNET_NO);
685 /* integer underflow => drop (should be very rare)! */
686 GNUNET_free (pr);
687 return GNUNET_OK;
688 }
689 pr->ttl -= ttl_decrement;
690 pr->start_time = GNUNET_TIME_absolute_get ();
691
692 /* get bloom filter */
693 if (bfsize > 0)
694 {
695 pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
696 bfsize,
697 BLOOMFILTER_K);
698 pr->bf_size = bfsize;
699 }
700 cdc.have = NULL;
701 cdc.pr = pr;
702 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
703 &gm->query,
704 &check_duplicate_request_peer,
705 &cdc);
706 if (cdc.have != NULL)
707 {
708 if (cdc.have->start_time.abs_value + cdc.have->ttl >=
709 pr->start_time.abs_value + pr->ttl)
710 {
711 /* existing request has higher TTL, drop new one! */
712 cdc.have->priority += pr->priority;
713 destroy_pending_request (pr);
714#if DEBUG_FS
715 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
716 "Have existing request with higher TTL, dropping new request.\n",
717 GNUNET_i2s (other));
718#endif
719 GNUNET_STATISTICS_update (stats,
720 gettext_noop ("# requests dropped due to higher-TTL request"),
721 1,
722 GNUNET_NO);
723 return GNUNET_OK;
724 }
725 else
726 {
727 /* existing request has lower TTL, drop old one! */
728 pr->priority += cdc.have->priority;
729 /* Possible optimization: if we have applicable pending
730 replies in 'cdc.have', we might want to move those over
731 (this is a really rare special-case, so it is not clear
732 that this would be worth it) */
733 destroy_pending_request (cdc.have);
734 /* keep processing 'pr'! */
735 }
736 }
737
738 pr->cp = cp;
739 GNUNET_break (GNUNET_OK ==
740 GNUNET_CONTAINER_multihashmap_put (query_request_map,
741 &gm->query,
742 pr,
743 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
744 GNUNET_break (GNUNET_OK ==
745 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
746 &other->hashPubKey,
747 pr,
748 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
749
750 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
751 pr,
752 pr->start_time.abs_value + pr->ttl);
753
754 GNUNET_STATISTICS_update (stats,
755 gettext_noop ("# P2P searches received"),
756 1,
757 GNUNET_NO);
758 GNUNET_STATISTICS_update (stats,
759 gettext_noop ("# P2P searches active"),
760 1,
761 GNUNET_NO);
762
763 /* calculate change in traffic preference */
764 cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
765 /* process locally */
766 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
767 type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
768 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
769 (pr->priority + 1));
770 if (GNUNET_YES != pr->forward_only)
771 {
772#if DEBUG_FS
773 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774 "Handing request for `%s' to datastore\n",
775 GNUNET_h2s (&gm->query));
776#endif
777 pr->qe = GNUNET_DATASTORE_get (dsh,
778 &gm->query,
779 type,
780 pr->priority + 1,
781 MAX_DATASTORE_QUEUE,
782 timeout,
783 &process_local_reply,
784 pr);
785 if (NULL == pr->qe)
786 {
787 GNUNET_STATISTICS_update (stats,
788 gettext_noop ("# requests dropped by datastore (queue length limit)"),
789 1,
790 GNUNET_NO);
791 }
792 }
793 else
794 {
795 GNUNET_STATISTICS_update (stats,
796 gettext_noop ("# requests forwarded due to high load"),
797 1,
798 GNUNET_NO);
799 }
800
801 /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
802 switch (pr->type)
803 {
804 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
805 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
806 /* only one result, wait for datastore */
807 if (GNUNET_YES != pr->forward_only)
808 {
809 GNUNET_STATISTICS_update (stats,
810 gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
811 1,
812 GNUNET_NO);
813 break;
814 }
815 default:
816 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
817 pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
818 pr);
819 }
820
821 /* make sure we don't track too many requests */
822 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
823 {
824 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
825 GNUNET_assert (pr != NULL);
826 destroy_pending_request (pr);
827 }
828 return GNUNET_OK;
829
830
831
441 // FIXME! 832 // FIXME!
442 // parse request 833 // parse request
443 // setup pending request 834 // setup pending request (use 'handle_p2p_reply')
444 // track pending request to cancel it on peer disconnect (!) 835 // track pending request to cancel it on peer disconnect (!)
445 // return it! 836 // return it!
446 // (actual planning & execution up to caller!) 837 // (actual planning & execution up to caller!)
@@ -815,6 +1206,41 @@ GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
815 1206
816 1207
817/** 1208/**
1209 * Ask a peer to stop migrating data to us until the given point
1210 * in time.
1211 *
1212 * @param cp peer to ask
1213 * @param block_time until when to block
1214 */
1215void
1216GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1217 struct GNUNET_TIME_Relative block_time)
1218{
1219 struct PendingMessage *pm;
1220 struct MigrationStopMessage *msm;
1221
1222 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value)
1223 return; /* already blocked */
1224 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
1225
1226 /* FIXME: adapt old code below to new API! */
1227 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
1228 sizeof (struct MigrationStopMessage));
1229 pm->msize = sizeof (struct MigrationStopMessage);
1230 pm->priority = UINT32_MAX;
1231 msm = (struct MigrationStopMessage*) &pm[1];
1232 msm->header.size = htons (sizeof (struct MigrationStopMessage));
1233 msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1234 msm->duration = GNUNET_TIME_relative_hton (block_time);
1235 add_to_pending_messages_for_peer (cp,
1236 pm,
1237 NULL);
1238}
1239
1240
1241
1242
1243/**
818 * Write host-trust information to a file - flush the buffer entry! 1244 * Write host-trust information to a file - flush the buffer entry!
819 * 1245 *
820 * @param cls closure, not used 1246 * @param cls closure, not used
@@ -965,9 +1391,9 @@ GSF_connected_peer_done_ ()
965 * @return GNUNET_YES (we should continue to iterate) 1391 * @return GNUNET_YES (we should continue to iterate)
966 */ 1392 */
967static int 1393static int
968clean_peer (void *cls, 1394clean_local_client (void *cls,
969 const GNUNET_HashCode * key, 1395 const GNUNET_HashCode * key,
970 void *value) 1396 void *value)
971{ 1397{
972 const struct GSF_LocalClient *lc = cls; 1398 const struct GSF_LocalClient *lc = cls;
973 struct GSF_ConnectedPeer *cp = value; 1399 struct GSF_ConnectedPeer *cp = value;
diff --git a/src/fs/gnunet-service-fs_cp.h b/src/fs/gnunet-service-fs_cp.h
index 9bf36186c..f08e31a72 100644
--- a/src/fs/gnunet-service-fs_cp.h
+++ b/src/fs/gnunet-service-fs_cp.h
@@ -90,6 +90,11 @@ struct GSF_PeerPerformanceData
90 double avg_priority; 90 double avg_priority;
91 91
92 /** 92 /**
93 * Trust rating for this peer
94 */
95 uint32_t trust;
96
97 /**
93 * Number of pending queries (replies are not counted) 98 * Number of pending queries (replies are not counted)
94 */ 99 */
95 unsigned int pending_queries; 100 unsigned int pending_queries;
@@ -265,6 +270,28 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
265 270
266 271
267/** 272/**
273 * Return the performance data record for the given peer
274 *
275 * @param cp peer to query
276 * @return performance data record for the peer
277 */
278struct GSF_PeerPerformanceData *
279GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp);
280
281
282/**
283 * Ask a peer to stop migrating data to us until the given point
284 * in time.
285 *
286 * @param cp peer to ask
287 * @param block_time until when to block
288 */
289void
290GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
291 struct GNUNET_TIME_Relative block_time);
292
293
294/**
268 * A peer disconnected from us. Tear down the connected peer 295 * A peer disconnected from us. Tear down the connected peer
269 * record. 296 * record.
270 * 297 *
diff --git a/src/fs/gnunet-service-fs_lc.c b/src/fs/gnunet-service-fs_lc.c
index 9c9c0d568..ea33580f9 100644
--- a/src/fs/gnunet-service-fs_lc.c
+++ b/src/fs/gnunet-service-fs_lc.c
@@ -179,6 +179,70 @@ GSF_local_client_lookup_ (struct GNUNET_SERVER_Client *client)
179 179
180 180
181/** 181/**
182 * Handle a reply to a pending request. Also called if a request
183 * expires (then with data == NULL). The handler may be called
184 * many times (depending on the request type), but will not be
185 * called during or after a call to GSF_pending_request_cancel
186 * and will also not be called anymore after a call signalling
187 * expiration.
188 *
189 * @param cls user-specified closure
190 * @param pr handle to the original pending request
191 * @param data response data, NULL on request expiration
192 * @param data_len number of bytes in data
193 */
194static void
195client_response_handler (void *cls,
196 struct GSF_PendingRequest *pr,
197 const void *data,
198 size_t data_len)
199{
200 /* FIXME: adapt old code below to new API! */
201
202 GNUNET_STATISTICS_update (stats,
203 gettext_noop ("# replies received for local clients"),
204 1,
205 GNUNET_NO);
206 cl = pr->client_request_list->client_list;
207 msize = sizeof (struct PutMessage) + prq->size;
208 creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
209 creply->msize = msize;
210 creply->client_list = cl;
211 GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
212 cl->res_tail,
213 cl->res_tail,
214 creply);
215 pm = (struct PutMessage*) &creply[1];
216 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
217 pm->header.size = htons (msize);
218 pm->type = htonl (prq->type);
219 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
220 memcpy (&pm[1], prq->data, prq->size);
221 if (NULL == cl->th)
222 {
223#if DEBUG_FS
224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
225 "Transmitting result for query `%s' to client\n",
226 GNUNET_h2s (key));
227#endif
228 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
229 msize,
230 GNUNET_TIME_UNIT_FOREVER_REL,
231 &transmit_to_client,
232 cl);
233 }
234 GNUNET_break (cl->th != NULL);
235 if (pr->do_remove)
236 {
237 prq->finished = GNUNET_YES;
238 destroy_pending_request (pr);
239 }
240
241}
242
243
244
245/**
182 * Handle START_SEARCH-message (search request from local client). 246 * Handle START_SEARCH-message (search request from local client).
183 * 247 *
184 * @param client identification of the client 248 * @param client identification of the client
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index ad19a807e..047c07587 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -37,16 +37,39 @@ struct GSF_PendingRequest
37 */ 37 */
38 struct GSF_PendingRequestData public_data; 38 struct GSF_PendingRequestData public_data;
39 39
40 /**
41 * Function to call if we encounter a reply.
42 */
40 GSF_PendingRequestReplyHandler rh; 43 GSF_PendingRequestReplyHandler rh;
41 44
45 /**
46 * Closure for 'rh'
47 */
42 void *rh_cls; 48 void *rh_cls;
43 49
44 const GNUNET_HashCode *replies_seen; 50 /**
51 * Array of hash codes of replies we've already seen.
52 */
53 GNUNET_HashCode *replies_seen;
45 54
55 /**
56 * Bloomfilter masking replies we've already seen.
57 */
46 struct GNUNET_CONTAINER_BloomFilter *bf; 58 struct GNUNET_CONTAINER_BloomFilter *bf;
47 59
60 /**
61 * Number of valid entries in the 'replies_seen' array.
62 */
48 unsigned int replies_seen_count; 63 unsigned int replies_seen_count;
49 64
65 /**
66 * Length of the 'replies_seen' array.
67 */
68 unsigned int replies_seen_size;
69
70 /**
71 * Mingle value we currently use for the bf.
72 */
50 int32_t mingle; 73 int32_t mingle;
51 74
52}; 75};
@@ -56,7 +79,99 @@ struct GSF_PendingRequest
56 * All pending requests, ordered by the query. Entries 79 * All pending requests, ordered by the query. Entries
57 * are of type 'struct GSF_PendingRequest*'. 80 * are of type 'struct GSF_PendingRequest*'.
58 */ 81 */
59static struct GNUNET_CONTAINER_MultiHashMap *requests; 82static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
83
84
85/**
86 * Datastore 'PUT' load tracking.
87 */
88static struct GNUNET_LOAD_Value *datastore_put_load;
89
90
91/**
92 * Are we allowed to migrate content to this peer.
93 */
94static int active_to_migration;
95
96
97/**
98 * Heap with the request that will expire next at the top. Contains
99 * pointers of type "struct PendingRequest*"; these will *also* be
100 * aliased from the "requests_by_peer" data structures and the
101 * "requests_by_query" table. Note that requests from our clients
102 * don't expire and are thus NOT in the "requests_by_expiration"
103 * (or the "requests_by_peer" tables).
104 */
105static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
106
107
108/**
109 * How many bytes should a bloomfilter be if we have already seen
110 * entry_count responses? Note that BLOOMFILTER_K gives us the number
111 * of bits set per entry. Furthermore, we should not re-size the
112 * filter too often (to keep it cheap).
113 *
114 * Since other peers will also add entries but not resize the filter,
115 * we should generally pick a slightly larger size than what the
116 * strict math would suggest.
117 *
118 * @return must be a power of two and smaller or equal to 2^15.
119 */
120static size_t
121compute_bloomfilter_size (unsigned int entry_count)
122{
123 size_t size;
124 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
125 uint16_t max = 1 << 15;
126
127 if (entry_count > max)
128 return max;
129 size = 8;
130 while ((size < max) && (size < ideal))
131 size *= 2;
132 if (size > max)
133 return max;
134 return size;
135}
136
137
138/**
139 * Recalculate our bloom filter for filtering replies. This function
140 * will create a new bloom filter from scratch, so it should only be
141 * called if we have no bloomfilter at all (and hence can create a
142 * fresh one of minimal size without problems) OR if our peer is the
143 * initiator (in which case we may resize to larger than mimimum size).
144 *
145 * @param pr request for which the BF is to be recomputed
146 * @return GNUNET_YES if a refresh actually happened
147 */
148static int
149refresh_bloomfilter (struct GSF_PendingRequest *pr)
150{
151 unsigned int i;
152 size_t nsize;
153 GNUNET_HashCode mhash;
154
155 nsize = compute_bloomfilter_size (pr->replies_seen_off);
156 if ( (bf != NULL) &&
157 (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
158 return GNUNET_NO; /* size not changed */
159 if (pr->bf != NULL)
160 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
161 pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
162 UINT32_MAX);
163 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
164 nsize,
165 BLOOMFILTER_K);
166 for (i=0;i<pr->replies_seen_count;i++)
167 {
168 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
169 pr->mingle,
170 &mhash);
171 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
172 }
173 return GNUNET_YES;
174}
60 175
61 176
62/** 177/**
@@ -92,7 +207,54 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
92 GSF_PendingRequestReplyHandler rh, 207 GSF_PendingRequestReplyHandler rh,
93 void *rh_cls) 208 void *rh_cls)
94{ 209{
95 return NULL; // FIXME 210 struct GSF_PendingRequest *pr;
211
212
213 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
214 pr->public_data.query = *query;
215 if (GNUNET_BLOCK_TYPE_SBLOCK == type)
216 {
217 GNUNET_assert (NULL != namespace);
218 pr->public_data.namespace = *namespace;
219 }
220 if (NULL != target)
221 {
222 pr->public_data.target = *target;
223 pr->has_target = GNUNET_YES;
224 }
225 pr->public_data.anonymity_level = anonymity_data;
226 pr->public_data.priority = priority;
227 pr->public_data.options = options;
228 pr->public_data.type = type;
229 pr->rh = rh;
230 pr->rh_cls = rh_cls;
231 if (replies_seen_count > 0)
232 {
233 pr->replies_seen_size = replies_seen_count;
234 pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
235 memcpy (pr->replies_seen,
236 replies_seen,
237 replies_seen_count * sizeof (struct GNUNET_HashCode));
238 pr->replies_seen_count = replies_seen_count;
239 }
240 if (NULL != bf)
241 {
242 pr->bf = GNUNET_CONTAINER_bloomfilter_copy (bf);
243 pr->mingle = mingle;
244 }
245 else if ( (replies_seen_count > 0) &&
246 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
247 {
248 GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
249 }
250 GNUNET_CONTAINER_multihashmap_put (pr_map,
251 query,
252 pr,
253 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
254 // FIXME: if not a local query, we also need to track the
255 // total number of external queries we currently have and
256 // bound it => need an additional heap!
257 return pr;
96} 258}
97 259
98 260
@@ -109,34 +271,54 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
109 const GNUNET_HashCode *replies_seen, 271 const GNUNET_HashCode *replies_seen,
110 unsigned int replies_seen_count) 272 unsigned int replies_seen_count)
111{ 273{
112 // FIXME 274 unsigned int i;
113} 275 GNUNET_HashCode mhash;
114 276
115 277 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
116 278 return; /* integer overflow */
117/** 279 if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
118 * Get the query for a given pending request. 280 {
119 * 281 /* we're responsible for the BF, full refresh */
120 * @param pr the request 282 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
121 * @return pointer to the query (only valid as long as pr is valid) 283 GNUNET_array_grow (pr->replies_seen,
122 */ 284 pr->replies_seen_size,
123const GNUNET_HashCode * 285 replies_seen_count + pr->replies_seen_count);
124GSF_pending_request_get_query_ (const struct GSF_PendingRequest *pr) 286 memcpy (&pr->replies_seen[pr->replies_seen_count],
125{ 287 replies_seen,
126 return NULL; // FIXME 288 sizeof (GNUNET_HashCode) * replies_seen_count);
127} 289 pr->replies_seen_count += replies_seen;
128 290 if (GNUNET_NO == refresh_bloomfilter (pr))
129 291 {
130/** 292 /* bf not recalculated, simply extend it with new bits */
131 * Get the type of a given pending request. 293 for (i=0;i<pr->replies_seen_count;i++)
132 * 294 {
133 * @param pr the request 295 GNUNET_BLOCK_mingle_hash (&replies_seen[i],
134 * @return query type 296 pr->mingle,
135 */ 297 &mhash);
136enum GNUNET_BLOCK_Type 298 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
137GSF_pending_request_get_type_ (const struct GSF_PendingRequest *pr) 299 }
138{ 300 }
139 return 0; // FIXME 301 }
302 else
303 {
304 if (NULL == pr->bf)
305 {
306 /* we're not the initiator, but the initiator did not give us
307 any bloom-filter, so we need to create one on-the-fly */
308 pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
309 UINT32_MAX);
310 pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count),
311 pr->mingle,
312 BLOOMFILTER_K);
313 }
314 for (i=0;i<pr->replies_seen_count;i++)
315 {
316 GNUNET_BLOCK_mingle_hash (&replies_seen[i],
317 pr->mingle,
318 &mhash);
319 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
320 }
321 }
140} 322}
141 323
142 324
@@ -145,16 +327,102 @@ GSF_pending_request_get_type_ (const struct GSF_PendingRequest *pr)
145 * transmission to other peers (or at least determine its size). 327 * transmission to other peers (or at least determine its size).
146 * 328 *
147 * @param pr request to generate the message for 329 * @param pr request to generate the message for
330 * @param do_route are we routing the reply
148 * @param buf_size number of bytes available in buf 331 * @param buf_size number of bytes available in buf
149 * @param buf where to copy the message (can be NULL) 332 * @param buf where to copy the message (can be NULL)
150 * @return number of bytes needed (if > buf_size) or used 333 * @return number of bytes needed (if > buf_size) or used
151 */ 334 */
152size_t 335size_t
153GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, 336GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
337 int do_route,
154 size_t buf_size, 338 size_t buf_size,
155 void *buf) 339 void *buf)
156{ 340{
157 return 0; // FIXME 341 struct PendingMessage *pm;
342 char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
343 struct GetMessage *gm;
344 GNUNET_HashCode *ext;
345 size_t msize;
346 unsigned int k;
347 int no_route;
348 uint32_t bm;
349 uint32_t prio;
350 size_t bf_size;
351
352 k = 0;
353 bm = 0;
354 if (GNUNET_YES != do_route)
355 {
356 bm |= GET_MESSAGE_BIT_RETURN_TO;
357 k++;
358 }
359 if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
360 {
361 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
362 k++;
363 }
364 if (GNUNET_YES == pr->has_target)
365 {
366 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
367 k++;
368 }
369 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
370 msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
371 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
372 if (buf_size < msize)
373 return msize;
374 gm = (struct GetMessage*) lbuf;
375 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
376 gm->header.size = htons (msize);
377 gm->type = htonl (pr->type);
378 if (GNUNET_YES == do_route)
379 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
380 pr->public_data.priority + 1);
381 else
382 prio = 0;
383 pr->public_data.priority -= prio;
384 gm->priority = htonl (prio);
385 gm->ttl = htonl (pr->ttl);
386 gm->filter_mutator = htonl(pr->mingle);
387 gm->hash_bitmap = htonl (bm);
388 gm->query = pr->query;
389 ext = (GNUNET_HashCode*) &gm[1];
390 k = 0;
391 if (GNUNET_YES != do_route)
392 GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
393 if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
394 memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
395 if (GNUNET_YES == pr->has_target)
396 GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
397 if (pr->bf != NULL)
398 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
399 (char*) &ext[k],
400 bf_size);
401 memcpy (buf, gm, msize);
402 return msize;
403}
404
405
406/**
407 * Iterator to free pending requests.
408 *
409 * @param cls closure, unused
410 * @param key current key code
411 * @param value value in the hash map (pending request)
412 * @return GNUNET_YES (we should continue to iterate)
413 */
414static int
415clean_request (void *cls,
416 const GNUNET_HashCode * key,
417 void *value)
418{
419 struct GSF_PendingRequest *pr = value;
420
421 GNUNET_free_non_null (pr->replies_seen);
422 if (NULL != pr->bf)
423 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
424 GNUNET_free (pr);
425 return GNUNET_YES;
158} 426}
159 427
160 428
@@ -166,6 +434,12 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
166void 434void
167GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr) 435GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
168{ 436{
437 GNUNET_assert (GNUNET_OK ==
438 GNUNET_CONTAINER_multihashmap_remove (pr_map,
439 &pr->public_data.query,
440 pr));
441 GNUNET_assert (GNUNET_YES ==
442 clean_request (NULL, &pr->public_data.query, pr));
169} 443}
170 444
171 445
@@ -176,10 +450,369 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
176 * @param cls closure for it 450 * @param cls closure for it
177 */ 451 */
178void 452void
179GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, 453GSF_iterate_pending_pr_map_ (GSF_PendingRequestIterator it,
180 void *cls) 454 void *cls)
455{
456 GNUNET_CONTAINER_multihashmap_iterate (pr_map,
457 (GNUNET_CONTAINER_HashMapIterator) it,
458 cls);
459}
460
461
462
463
464/**
465 * Closure for "process_reply" function.
466 */
467struct ProcessReplyClosure
468{
469 /**
470 * The data for the reply.
471 */
472 const void *data;
473
474 /**
475 * Who gave us this reply? NULL for local host (or DHT)
476 */
477 struct ConnectedPeer *sender;
478
479 /**
480 * When the reply expires.
481 */
482 struct GNUNET_TIME_Absolute expiration;
483
484 /**
485 * Size of data.
486 */
487 size_t size;
488
489 /**
490 * Type of the block.
491 */
492 enum GNUNET_BLOCK_Type type;
493
494 /**
495 * How much was this reply worth to us?
496 */
497 uint32_t priority;
498
499 /**
500 * Anonymity requirements for this reply.
501 */
502 uint32_t anonymity_level;
503
504 /**
505 * Evaluation result (returned).
506 */
507 enum GNUNET_BLOCK_EvaluationResult eval;
508
509 /**
510 * Did we finish processing the associated request?
511 */
512 int finished;
513
514 /**
515 * Did we find a matching request?
516 */
517 int request_found;
518};
519
520
521/**
522 * Update the performance data for the sender (if any) since
523 * the sender successfully answered one of our queries.
524 *
525 * @param prq information about the sender
526 * @param pr request that was satisfied
527 */
528static void
529update_request_performance_data (struct ProcessReplyClosure *prq,
530 struct GSF_PendingRequest *pr)
531{
532 unsigned int i;
533 struct GNUNET_TIME_Relative cur_delay;
534
535 if (prq->sender == NULL)
536 return;
537 /* FIXME: adapt code to new API... */
538 for (i=0;i<pr->used_targets_off;i++)
539 if (pr->used_targets[i].pid == prq->sender->pid)
540 break;
541 if (i < pr->used_targets_off)
542 {
543 cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);
544 prq->sender->avg_delay.rel_value
545 = (prq->sender->avg_delay.rel_value *
546 (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N;
547 prq->sender->avg_priority
548 = (prq->sender->avg_priority *
549 (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
550 }
551 if (pr->cp != NULL)
552 {
553 GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
554 [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
555 -1);
556 GNUNET_PEER_change_rc (pr->cp->pid, 1);
557 prq->sender->last_p2p_replies
558 [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
559 = pr->cp->pid;
560 }
561 else
562 {
563 if (NULL != prq->sender->last_client_replies
564 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
565 GNUNET_SERVER_client_drop (prq->sender->last_client_replies
566 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
567 prq->sender->last_client_replies
568 [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
569 = pr->client_request_list->client_list->client;
570 GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
571 }
572}
573
574
575
576/**
577 * We have received a reply; handle it!
578 *
579 * @param cls response (struct ProcessReplyClosure)
580 * @param key our query
581 * @param value value in the hash map (info about the query)
582 * @return GNUNET_YES (we should continue to iterate)
583 */
584static int
585process_reply (void *cls,
586 const GNUNET_HashCode * key,
587 void *value)
588{
589 struct ProcessReplyClosure *prq = cls;
590 struct GSF_PendingRequest *pr = value;
591 struct PendingMessage *reply;
592 struct ClientResponseMessage *creply;
593 struct ClientList *cl;
594 struct PutMessage *pm;
595 struct ConnectedPeer *cp;
596 size_t msize;
597
598#if DEBUG_FS
599 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
600 "Matched result (type %u) for query `%s' with pending request\n",
601 (unsigned int) prq->type,
602 GNUNET_h2s (key));
603#endif
604 GNUNET_STATISTICS_update (stats,
605 gettext_noop ("# replies received and matched"),
606 1,
607 GNUNET_NO);
608 prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
609 prq->type,
610 key,
611 &pr->bf,
612 pr->mingle,
613 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
614 prq->data,
615 prq->size);
616 switch (prq->eval)
617 {
618 case GNUNET_BLOCK_EVALUATION_OK_MORE:
619 update_request_performance_data (prq, pr);
620 break;
621 case GNUNET_BLOCK_EVALUATION_OK_LAST:
622 update_request_performance_data (prq, pr);
623 /* FIXME: adapt code to new API! */
624 while (NULL != pr->pending_head)
625 destroy_pending_message_list_entry (pr->pending_head);
626 if (pr->qe != NULL)
627 {
628 if (pr->client_request_list != NULL)
629 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
630 GNUNET_YES);
631 GNUNET_DATASTORE_cancel (pr->qe);
632 pr->qe = NULL;
633 }
634 pr->do_remove = GNUNET_YES;
635 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
636 {
637 GNUNET_SCHEDULER_cancel (pr->task);
638 pr->task = GNUNET_SCHEDULER_NO_TASK;
639 }
640 GNUNET_break (GNUNET_YES ==
641 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
642 key,
643 pr));
644 GNUNET_LOAD_update (rt_entry_lifetime,
645 GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
646 break;
647 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
648 GNUNET_STATISTICS_update (stats,
649 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
650 1,
651 GNUNET_NO);
652#if DEBUG_FS && 0
653 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
654 "Duplicate response `%s', discarding.\n",
655 GNUNET_h2s (&mhash));
656#endif
657 return GNUNET_YES; /* duplicate */
658 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
659 return GNUNET_YES; /* wrong namespace */
660 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
661 GNUNET_break (0);
662 return GNUNET_YES;
663 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
664 GNUNET_break (0);
665 return GNUNET_YES;
666 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
667 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
668 _("Unsupported block type %u\n"),
669 prq->type);
670 return GNUNET_NO;
671 }
672 /* FIXME: adapt code to new API! */
673 if (pr->client_request_list != NULL)
674 {
675 if (pr->replies_seen_size == pr->replies_seen_off)
676 GNUNET_array_grow (pr->replies_seen,
677 pr->replies_seen_size,
678 pr->replies_seen_size * 2 + 4);
679 GNUNET_CRYPTO_hash (prq->data,
680 prq->size,
681 &pr->replies_seen[pr->replies_seen_off++]);
682 refresh_bloomfilter (pr);
683 }
684 if (NULL == prq->sender)
685 {
686#if DEBUG_FS
687 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688 "Found result for query `%s' in local datastore\n",
689 GNUNET_h2s (key));
690#endif
691 GNUNET_STATISTICS_update (stats,
692 gettext_noop ("# results found locally"),
693 1,
694 GNUNET_NO);
695 }
696 prq->priority += pr->remaining_priority;
697 pr->remaining_priority = 0;
698 pr->results_found++;
699 prq->request_found = GNUNET_YES;
700 /* finally, pass on to other peers / local clients */
701 pr->rh (pr->rh_cls, pr, prq->data, prq->size);
702 return GNUNET_YES;
703}
704
705
706/**
707 * Continuation called to notify client about result of the
708 * operation.
709 *
710 * @param cls closure
711 * @param success GNUNET_SYSERR on failure
712 * @param msg NULL on success, otherwise an error message
713 */
714static void
715put_migration_continuation (void *cls,
716 int success,
717 const char *msg)
718{
719 struct GNUNET_TIME_Absolute *start = cls;
720 struct GNUNET_TIME_Relative delay;
721
722 delay = GNUNET_TIME_absolute_get_duration (*start);
723 GNUNET_free (start);
724 /* FIXME: should we really update the load value on failure? */
725 GNUNET_LOAD_update (datastore_put_load,
726 delay.rel_value);
727 if (GNUNET_OK == success)
728 return;
729 GNUNET_STATISTICS_update (stats,
730 gettext_noop ("# datastore 'put' failures"),
731 1,
732 GNUNET_NO);
733}
734
735
736/**
737 * Test if the DATABASE (PUT) load on this peer is too high
738 * to even consider processing the query at
739 * all.
740 *
741 * @return GNUNET_YES if the load is too high to do anything (load high)
742 * GNUNET_NO to process normally (load normal or low)
743 */
744static int
745test_put_load_too_high (uint32_t priority)
746{
747 double ld;
748
749 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
750 return GNUNET_NO; /* very fast */
751 ld = GNUNET_LOAD_get_load (datastore_put_load);
752 if (ld < 2.0 * (1 + priority))
753 return GNUNET_NO;
754 GNUNET_STATISTICS_update (stats,
755 gettext_noop ("# storage requests dropped due to high load"),
756 1,
757 GNUNET_NO);
758 return GNUNET_YES;
759}
760
761
762/**
763 * Iterator called on each result obtained for a DHT
764 * operation that expects a reply
765 *
766 * @param cls closure
767 * @param exp when will this value expire
768 * @param key key of the result
769 * @param get_path NULL-terminated array of pointers
770 * to the peers on reverse GET path (or NULL if not recorded)
771 * @param put_path NULL-terminated array of pointers
772 * to the peers on the PUT path (or NULL if not recorded)
773 * @param type type of the result
774 * @param size number of bytes in data
775 * @param data pointer to the result data
776 */
777void
778GSF_handle_dht_reply_ (void *cls,
779 struct GNUNET_TIME_Absolute exp,
780 const GNUNET_HashCode * key,
781 const struct GNUNET_PeerIdentity * const *get_path,
782 const struct GNUNET_PeerIdentity * const *put_path,
783 enum GNUNET_BLOCK_Type type,
784 size_t size,
785 const void *data)
181{ 786{
182 // FIXME 787 struct GSF_PendingRequest *pr = cls;
788 struct ProcessReplyClosure prq;
789
790 memset (&prq, 0, sizeof (prq));
791 prq.data = data;
792 prq.expiration = exp;
793 prq.size = size;
794 prq.type = type;
795 process_reply (&prq, key, pr);
796 if ( (GNUNET_YES == active_to_migration) &&
797 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
798 {
799#if DEBUG_FS
800 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801 "Replicating result for query `%s' with priority %u\n",
802 GNUNET_h2s (&query),
803 prq.priority);
804#endif
805 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
806 *start = GNUNET_TIME_absolute_get ();
807 GNUNET_DATASTORE_put (dsh,
808 0, &query, dsize, &put[1],
809 type, prq.priority, 1 /* anonymity */,
810 expiration,
811 1 + prq.priority, MAX_DATASTORE_QUEUE,
812 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
813 &put_migration_continuation,
814 start);
815 }
183} 816}
184 817
185 818
@@ -189,18 +822,106 @@ GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it,
189 * this content and possibly passes it on (to local clients or other 822 * this content and possibly passes it on (to local clients or other
190 * peers). Does NOT perform migration (content caching at this peer). 823 * peers). Does NOT perform migration (content caching at this peer).
191 * 824 *
192 * @param other the other peer involved (sender or receiver, NULL 825 * @param cp the other peer involved (sender or receiver, NULL
193 * for loopback messages where we are both sender and receiver) 826 * for loopback messages where we are both sender and receiver)
194 * @param message the actual message 827 * @param message the actual message
195 * @return how valueable was the content to us (0 for not at all), 828 * @return GNUNET_OK if the message was well-formed,
196 * GNUNET_SYSERR if the message was malformed (close connection, 829 * GNUNET_SYSERR if the message was malformed (close connection,
197 * do not cache under any circumstances) 830 * do not cache under any circumstances)
198 */ 831 */
199int 832int
200GSF_handle_p2p_content_ (const struct GNUNET_PeerIdentity *other, 833GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
201 const struct GNUNET_MessageHeader *message) 834 const struct GNUNET_MessageHeader *message)
202{ 835{
203 return GNUNET_SYSERR; // FIXME 836 const struct PutMessage *put;
837 uint16_t msize;
838 size_t dsize;
839 enum GNUNET_BLOCK_Type type;
840 struct GNUNET_TIME_Absolute expiration;
841 GNUNET_HashCode query;
842 struct ProcessReplyClosure prq;
843 struct GNUNET_TIME_Relative block_time;
844 double putl;
845 struct GNUNET_TIME_Absolute *start;
846
847 msize = ntohs (message->size);
848 if (msize < sizeof (struct PutMessage))
849 {
850 GNUNET_break_op(0);
851 return GNUNET_SYSERR;
852 }
853 put = (const struct PutMessage*) message;
854 dsize = msize - sizeof (struct PutMessage);
855 type = ntohl (put->type);
856 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
857 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
858 return GNUNET_SYSERR;
859 if (GNUNET_OK !=
860 GNUNET_BLOCK_get_key (block_ctx,
861 type,
862 &put[1],
863 dsize,
864 &query))
865 {
866 GNUNET_break_op (0);
867 return GNUNET_SYSERR;
868 }
869 /* now, lookup 'query' */
870 prq.data = (const void*) &put[1];
871 if (NULL != cp)
872 prq.sender = cp;
873 else
874 prq.sender = NULL;
875 prq.size = dsize;
876 prq.type = type;
877 prq.expiration = expiration;
878 prq.priority = 0;
879 prq.anonymity_level = 1;
880 prq.finished = GNUNET_NO;
881 prq.request_found = GNUNET_NO;
882 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
883 &query,
884 &process_reply,
885 &prq);
886 if (NULL != cp)
887 {
888 GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
889 GSF_get_peer_performance_data (cp)->trust += prq.priority;
890 }
891 if ( (GNUNET_YES == active_to_migration) &&
892 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
893 {
894#if DEBUG_FS
895 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
896 "Replicating result for query `%s' with priority %u\n",
897 GNUNET_h2s (&query),
898 prq.priority);
899#endif
900 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
901 *start = GNUNET_TIME_absolute_get ();
902 GNUNET_DATASTORE_put (dsh,
903 0, &query, dsize, &put[1],
904 type, prq.priority, 1 /* anonymity */,
905 expiration,
906 1 + prq.priority, MAX_DATASTORE_QUEUE,
907 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
908 &put_migration_continuation,
909 start);
910 }
911 putl = GNUNET_LOAD_get_load (datastore_put_load);
912 if ( (NULL != (cp = prq.sender)) &&
913 (GNUNET_NO == prq.request_found) &&
914 ( (GNUNET_YES != active_to_migration) ||
915 (putl > 2.5 * (1 + prq.priority)) ) )
916 {
917 if (GNUNET_YES != active_to_migration)
918 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
919 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
920 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
921 (unsigned int) (60000 * putl * putl)));
922 GSF_block_peer_migration (cp, block_time);
923 }
924 return GNUNET_OK;
204} 925}
205 926
206 927
@@ -210,7 +931,7 @@ GSF_handle_p2p_content_ (const struct GNUNET_PeerIdentity *other,
210void 931void
211GSF_pending_request_init_ () 932GSF_pending_request_init_ ()
212{ 933{
213 // FIXME 934 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
214} 935}
215 936
216 937
@@ -220,7 +941,11 @@ GSF_pending_request_init_ ()
220void 941void
221GSF_pending_request_done_ () 942GSF_pending_request_done_ ()
222{ 943{
223 // FIXME 944 GNUNET_CONTAINER_multihashmap_iterate (pr_map,
945 &clean_request,
946 NULL);
947 GNUNET_CONTAINER_multihashmap_destroy (pr_map);
948 pr_map = NULL;
224} 949}
225 950
226 951
diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h
index 5fb9d2a5a..88c650042 100644
--- a/src/fs/gnunet-service-fs_pr.h
+++ b/src/fs/gnunet-service-fs_pr.h
@@ -68,7 +68,9 @@ enum GSF_PendingRequestOptions
68 68
69 69
70/** 70/**
71 * Public data associated with each pending request. 71 * Public data (in the sense of not encapsulated within
72 * 'gnunet-service-fs_pr', not in the sense of network-wide
73 * known) associated with each pending request.
72 */ 74 */
73struct GSF_PendingRequestData 75struct GSF_PendingRequestData
74{ 76{
@@ -185,36 +187,18 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
185 187
186 188
187/** 189/**
188 * Get the query for a given pending request.
189 *
190 * @param pr the request
191 * @return pointer to the query (only valid as long as pr is valid)
192 */
193const GNUNET_HashCode *
194GSF_pending_request_get_query_ (const struct GSF_PendingRequest *pr);
195
196
197/**
198 * Get the type of a given pending request.
199 *
200 * @param pr the request
201 * @return query type
202 */
203enum GNUNET_BLOCK_Type
204GSF_pending_request_get_type_ (const struct GSF_PendingRequest *pr);
205
206
207/**
208 * Generate the message corresponding to the given pending request for 190 * Generate the message corresponding to the given pending request for
209 * transmission to other peers (or at least determine its size). 191 * transmission to other peers (or at least determine its size).
210 * 192 *
211 * @param pr request to generate the message for 193 * @param pr request to generate the message for
194 * @param do_route are we routing the reply
212 * @param buf_size number of bytes available in buf 195 * @param buf_size number of bytes available in buf
213 * @param buf where to copy the message (can be NULL) 196 * @param buf where to copy the message (can be NULL)
214 * @return number of bytes needed (if > buf_size) or used 197 * @return number of bytes needed (if buf_size too small) or used
215 */ 198 */
216size_t 199size_t
217GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, 200GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
201 int do_route,
218 size_t buf_size, 202 size_t buf_size,
219 void *buf); 203 void *buf);
220 204
@@ -230,10 +214,12 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr);
230 214
231/** 215/**
232 * Signature of function called on each request. 216 * Signature of function called on each request.
217 * (Note: 'subtype' of GNUNET_CONTAINER_HashMapIterator).
233 * 218 *
234 * @param cls closure 219 * @param cls closure
235 * @param key query for the request 220 * @param key query for the request
236 * @param pr handle to the pending request 221 * @param pr handle to the pending request
222 * @return GNUNET_YES to continue to iterate
237 */ 223 */
238typedef int (*GSF_PendingRequestIterator)(void *cls, 224typedef int (*GSF_PendingRequestIterator)(void *cls,
239 const GNUNET_HashCode *key, 225 const GNUNET_HashCode *key,
@@ -257,19 +243,45 @@ GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it,
257 * this content and possibly passes it on (to local clients or other 243 * this content and possibly passes it on (to local clients or other
258 * peers). Does NOT perform migration (content caching at this peer). 244 * peers). Does NOT perform migration (content caching at this peer).
259 * 245 *
260 * @param other the other peer involved (sender or receiver, NULL 246 * @param cp the other peer involved (sender or receiver, NULL
261 * for loopback messages where we are both sender and receiver) 247 * for loopback messages where we are both sender and receiver)
262 * @param message the actual message 248 * @param message the actual message
263 * @return how valueable was the content to us (0 for not at all), 249 * @return GNUNET_OK if the message was well-formed,
264 * GNUNET_SYSERR if the message was malformed (close connection, 250 * GNUNET_SYSERR if the message was malformed (close connection,
265 * do not cache under any circumstances) 251 * do not cache under any circumstances)
266 */ 252 */
267int 253int
268GSF_handle_p2p_content_ (const struct GNUNET_PeerIdentity *other, 254GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
269 const struct GNUNET_MessageHeader *message); 255 const struct GNUNET_MessageHeader *message);
270 256
271 257
272/** 258/**
259 * Iterator called on each result obtained for a DHT
260 * operation that expects a reply
261 *
262 * @param cls closure, the 'struct GSF_PendingRequest *'.
263 * @param exp when will this value expire
264 * @param key key of the result
265 * @param get_path NULL-terminated array of pointers
266 * to the peers on reverse GET path (or NULL if not recorded)
267 * @param put_path NULL-terminated array of pointers
268 * to the peers on the PUT path (or NULL if not recorded)
269 * @param type type of the result
270 * @param size number of bytes in data
271 * @param data pointer to the result data
272 */
273void
274GSF_handle_dht_reply_ (void *cls,
275 struct GNUNET_TIME_Absolute exp,
276 const GNUNET_HashCode * key,
277 const struct GNUNET_PeerIdentity * const *get_path,
278 const struct GNUNET_PeerIdentity * const *put_path,
279 enum GNUNET_BLOCK_Type type,
280 size_t size,
281 const void *data);
282
283
284/**
273 * Setup the subsystem. 285 * Setup the subsystem.
274 */ 286 */
275void 287void
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c
new file mode 100644
index 000000000..9f515e2ee
--- /dev/null
+++ b/src/fs/gnunet-service-fs_push.c
@@ -0,0 +1,475 @@
1/*
2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file fs/gnunet-service-fs_push.c
23 * @brief API to push content from our datastore to other peers
24 * ('anonymous'-content P2P migration)
25 * @author Christian Grothoff
26 */
27#include "platform.h"
28#include "gnunet-service-fs_push.h"
29
30
31/* FIXME: below are only old code fragments to use... */
32
33/**
34 * Block that is ready for migration to other peers. Actual data is at the end of the block.
35 */
36struct MigrationReadyBlock
37{
38
39 /**
40 * This is a doubly-linked list.
41 */
42 struct MigrationReadyBlock *next;
43
44 /**
45 * This is a doubly-linked list.
46 */
47 struct MigrationReadyBlock *prev;
48
49 /**
50 * Query for the block.
51 */
52 GNUNET_HashCode query;
53
54 /**
55 * When does this block expire?
56 */
57 struct GNUNET_TIME_Absolute expiration;
58
59 /**
60 * Peers we would consider forwarding this
61 * block to. Zero for empty entries.
62 */
63 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
64
65 /**
66 * Size of the block.
67 */
68 size_t size;
69
70 /**
71 * Number of targets already used.
72 */
73 unsigned int used_targets;
74
75 /**
76 * Type of the block.
77 */
78 enum GNUNET_BLOCK_Type type;
79};
80
81
82/**
83 * Head of linked list of blocks that can be migrated.
84 */
85static struct MigrationReadyBlock *mig_head;
86
87/**
88 * Tail of linked list of blocks that can be migrated.
89 */
90static struct MigrationReadyBlock *mig_tail;
91
92/**
93 * Request to datastore for migration (or NULL).
94 */
95static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
96
97/**
98 * ID of task that collects blocks for migration.
99 */
100static GNUNET_SCHEDULER_TaskIdentifier mig_task;
101
102/**
103 * What is the maximum frequency at which we are allowed to
104 * poll the datastore for migration content?
105 */
106static struct GNUNET_TIME_Relative min_migration_delay;
107
108/**
109 * Are we allowed to push out content from this peer.
110 */
111static int active_from_migration;
112
113/**
114 * Size of the doubly-linked list of migration blocks.
115 */
116static unsigned int mig_size;
117
118
119/**
120 * Delete the given migration block.
121 *
122 * @param mb block to delete
123 */
124static void
125delete_migration_block (struct MigrationReadyBlock *mb)
126{
127 GNUNET_CONTAINER_DLL_remove (mig_head,
128 mig_tail,
129 mb);
130 GNUNET_PEER_decrement_rcs (mb->target_list,
131 MIGRATION_LIST_SIZE);
132 mig_size--;
133 GNUNET_free (mb);
134}
135
136
137/**
138 * Compare the distance of two peers to a key.
139 *
140 * @param key key
141 * @param p1 first peer
142 * @param p2 second peer
143 * @return GNUNET_YES if P1 is closer to key than P2
144 */
145static int
146is_closer (const GNUNET_HashCode *key,
147 const struct GNUNET_PeerIdentity *p1,
148 const struct GNUNET_PeerIdentity *p2)
149{
150 return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
151 &p2->hashPubKey,
152 key);
153}
154
155
156/**
157 * Consider migrating content to a given peer.
158 *
159 * @param cls 'struct MigrationReadyBlock*' to select
160 * targets for (or NULL for none)
161 * @param key ID of the peer
162 * @param value 'struct ConnectedPeer' of the peer
163 * @return GNUNET_YES (always continue iteration)
164 */
165static int
166consider_migration (void *cls,
167 const GNUNET_HashCode *key,
168 void *value)
169{
170 struct MigrationReadyBlock *mb = cls;
171 struct ConnectedPeer *cp = value;
172 struct MigrationReadyBlock *pos;
173 struct GNUNET_PeerIdentity cppid;
174 struct GNUNET_PeerIdentity otherpid;
175 struct GNUNET_PeerIdentity worstpid;
176 size_t msize;
177 unsigned int i;
178 unsigned int repl;
179
180 /* consider 'cp' as a migration target for mb */
181 if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
182 return GNUNET_YES; /* peer has requested no migration! */
183 if (mb != NULL)
184 {
185 GNUNET_PEER_resolve (cp->pid,
186 &cppid);
187 repl = MIGRATION_LIST_SIZE;
188 for (i=0;i<MIGRATION_LIST_SIZE;i++)
189 {
190 if (mb->target_list[i] == 0)
191 {
192 mb->target_list[i] = cp->pid;
193 GNUNET_PEER_change_rc (mb->target_list[i], 1);
194 repl = MIGRATION_LIST_SIZE;
195 break;
196 }
197 GNUNET_PEER_resolve (mb->target_list[i],
198 &otherpid);
199 if ( (repl == MIGRATION_LIST_SIZE) &&
200 is_closer (&mb->query,
201 &cppid,
202 &otherpid))
203 {
204 repl = i;
205 worstpid = otherpid;
206 }
207 else if ( (repl != MIGRATION_LIST_SIZE) &&
208 (is_closer (&mb->query,
209 &worstpid,
210 &otherpid) ) )
211 {
212 repl = i;
213 worstpid = otherpid;
214 }
215 }
216 if (repl != MIGRATION_LIST_SIZE)
217 {
218 GNUNET_PEER_change_rc (mb->target_list[repl], -1);
219 mb->target_list[repl] = cp->pid;
220 GNUNET_PEER_change_rc (mb->target_list[repl], 1);
221 }
222 }
223
224 /* consider scheduling transmission to cp for content migration */
225 if (cp->cth != NULL)
226 return GNUNET_YES;
227 msize = 0;
228 pos = mig_head;
229 while (pos != NULL)
230 {
231 for (i=0;i<MIGRATION_LIST_SIZE;i++)
232 {
233 if (cp->pid == pos->target_list[i])
234 {
235 if (msize == 0)
236 msize = pos->size;
237 else
238 msize = GNUNET_MIN (msize,
239 pos->size);
240 break;
241 }
242 }
243 pos = pos->next;
244 }
245 if (msize == 0)
246 return GNUNET_YES; /* no content available */
247#if DEBUG_FS
248 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
249 "Trying to migrate at least %u bytes to peer `%s'\n",
250 msize,
251 GNUNET_h2s (key));
252#endif
253 if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
254 {
255 GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
256 cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
257 }
258 cp->cth
259 = GNUNET_CORE_notify_transmit_ready (core,
260 0, GNUNET_TIME_UNIT_FOREVER_REL,
261 (const struct GNUNET_PeerIdentity*) key,
262 msize + sizeof (struct PutMessage),
263 &transmit_to_peer,
264 cp);
265 return GNUNET_YES;
266}
267
268
269/**
270 * Task that is run periodically to obtain blocks for content
271 * migration
272 *
273 * @param cls unused
274 * @param tc scheduler context (also unused)
275 */
276static void
277gather_migration_blocks (void *cls,
278 const struct GNUNET_SCHEDULER_TaskContext *tc);
279
280
281
282
283/**
284 * If the migration task is not currently running, consider
285 * (re)scheduling it with the appropriate delay.
286 */
287static void
288consider_migration_gathering ()
289{
290 struct GNUNET_TIME_Relative delay;
291
292 if (dsh == NULL)
293 return;
294 if (mig_qe != NULL)
295 return;
296 if (mig_task != GNUNET_SCHEDULER_NO_TASK)
297 return;
298 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
299 mig_size);
300 delay = GNUNET_TIME_relative_divide (delay,
301 MAX_MIGRATION_QUEUE);
302 delay = GNUNET_TIME_relative_max (delay,
303 min_migration_delay);
304 mig_task = GNUNET_SCHEDULER_add_delayed (delay,
305 &gather_migration_blocks,
306 NULL);
307}
308
309
310
311
312/**
313 * Process content offered for migration.
314 *
315 * @param cls closure
316 * @param key key for the content
317 * @param size number of bytes in data
318 * @param data content stored
319 * @param type type of the content
320 * @param priority priority of the content
321 * @param anonymity anonymity-level for the content
322 * @param expiration expiration time for the content
323 * @param uid unique identifier for the datum;
324 * maybe 0 if no unique identifier is available
325 */
326static void
327process_migration_content (void *cls,
328 const GNUNET_HashCode * key,
329 size_t size,
330 const void *data,
331 enum GNUNET_BLOCK_Type type,
332 uint32_t priority,
333 uint32_t anonymity,
334 struct GNUNET_TIME_Absolute
335 expiration, uint64_t uid)
336{
337 struct MigrationReadyBlock *mb;
338
339 if (key == NULL)
340 {
341 mig_qe = NULL;
342 if (mig_size < MAX_MIGRATION_QUEUE)
343 consider_migration_gathering ();
344 return;
345 }
346 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value <
347 MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
348 {
349 /* content will expire soon, don't bother */
350 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
351 return;
352 }
353 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
354 {
355 if (GNUNET_OK !=
356 GNUNET_FS_handle_on_demand_block (key, size, data,
357 type, priority, anonymity,
358 expiration, uid,
359 &process_migration_content,
360 NULL))
361 {
362 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
363 }
364 return;
365 }
366#if DEBUG_FS
367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
368 "Retrieved block `%s' of type %u for migration\n",
369 GNUNET_h2s (key),
370 type);
371#endif
372 mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
373 mb->query = *key;
374 mb->expiration = expiration;
375 mb->size = size;
376 mb->type = type;
377 memcpy (&mb[1], data, size);
378 GNUNET_CONTAINER_DLL_insert_after (mig_head,
379 mig_tail,
380 mig_tail,
381 mb);
382 mig_size++;
383 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
384 &consider_migration,
385 mb);
386 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
387}
388
389
390
391/**
392 * Task that is run periodically to obtain blocks for content
393 * migration
394 *
395 * @param cls unused
396 * @param tc scheduler context (also unused)
397 */
398static void
399gather_migration_blocks (void *cls,
400 const struct GNUNET_SCHEDULER_TaskContext *tc)
401{
402 mig_task = GNUNET_SCHEDULER_NO_TASK;
403 if (dsh != NULL)
404 {
405 mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
406 GNUNET_TIME_UNIT_FOREVER_REL,
407 &process_migration_content, NULL);
408 GNUNET_assert (mig_qe != NULL);
409 }
410}
411
412
413
414size_t
415API_ (void *cls,
416 size_t size, void *buf)
417{
418 next = mig_head;
419 while (NULL != (mb = next))
420 {
421 next = mb->next;
422 for (i=0;i<MIGRATION_LIST_SIZE;i++)
423 {
424 if ( (cp->pid == mb->target_list[i]) &&
425 (mb->size + sizeof (migm) <= size) )
426 {
427 GNUNET_PEER_change_rc (mb->target_list[i], -1);
428 mb->target_list[i] = 0;
429 mb->used_targets++;
430 memset (&migm, 0, sizeof (migm));
431 migm.header.size = htons (sizeof (migm) + mb->size);
432 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
433 migm.type = htonl (mb->type);
434 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
435 memcpy (&cbuf[msize], &migm, sizeof (migm));
436 msize += sizeof (migm);
437 size -= sizeof (migm);
438 memcpy (&cbuf[msize], &mb[1], mb->size);
439 msize += mb->size;
440 size -= mb->size;
441#if DEBUG_FS
442 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443 "Pushing migration block `%s' (%u bytes) to `%s'\n",
444 GNUNET_h2s (&mb->query),
445 (unsigned int) mb->size,
446 GNUNET_i2s (&pid));
447#endif
448 break;
449 }
450 else
451 {
452#if DEBUG_FS
453 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
454 "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
455 GNUNET_h2s (&mb->query),
456 (unsigned int) mb->size,
457 GNUNET_i2s (&pid));
458#endif
459 }
460 }
461 if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
462 (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
463 {
464 delete_migration_block (mb);
465 consider_migration_gathering ();
466 }
467 }
468 consider_migration (NULL,
469 &pid.hashPubKey,
470 cp);
471
472}
473
474
475
diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c
new file mode 100644
index 000000000..eb7289f1e
--- /dev/null
+++ b/src/fs/gnunet-service-fs_put.c
@@ -0,0 +1,197 @@
1/*
2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file fs/gnunet-service-fs_put.c
23 * @brief API to PUT zero-anonymity index data from our datastore into the DHT
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet-service-fs_put.h"
28
29/* FIXME: below are only old code fragments to use... */
30
31
32/**
33 * Request to datastore for DHT PUTs (or NULL).
34 */
35static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
36
37
38/**
39 * Type we will request for the next DHT PUT round from the datastore.
40 */
41static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
42
43/**
44 * ID of task that collects blocks for DHT PUTs.
45 */
46static GNUNET_SCHEDULER_TaskIdentifier dht_task;
47
48/**
49 * How many entires with zero anonymity do we currently estimate
50 * to have in the database?
51 */
52static unsigned int zero_anonymity_count_estimate;
53
54
55
56
57
58/**
59 * Task that is run periodically to obtain blocks for DHT PUTs.
60 *
61 * @param cls type of blocks to gather
62 * @param tc scheduler context (unused)
63 */
64static void
65gather_dht_put_blocks (void *cls,
66 const struct GNUNET_SCHEDULER_TaskContext *tc);
67
68
69
70/**
71 * If the DHT PUT gathering task is not currently running, consider
72 * (re)scheduling it with the appropriate delay.
73 */
74static void
75consider_dht_put_gathering (void *cls)
76{
77 struct GNUNET_TIME_Relative delay;
78
79 if (dsh == NULL)
80 return;
81 if (dht_qe != NULL)
82 return;
83 if (dht_task != GNUNET_SCHEDULER_NO_TASK)
84 return;
85 if (zero_anonymity_count_estimate > 0)
86 {
87 delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
88 zero_anonymity_count_estimate);
89 delay = GNUNET_TIME_relative_min (delay,
90 MAX_DHT_PUT_FREQ);
91 }
92 else
93 {
94 /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
95 (hopefully) appear */
96 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
97 }
98 dht_task = GNUNET_SCHEDULER_add_delayed (delay,
99 &gather_dht_put_blocks,
100 cls);
101}
102
103
104
105/**
106 * Store content in DHT.
107 *
108 * @param cls closure
109 * @param key key for the content
110 * @param size number of bytes in data
111 * @param data content stored
112 * @param type type of the content
113 * @param priority priority of the content
114 * @param anonymity anonymity-level for the content
115 * @param expiration expiration time for the content
116 * @param uid unique identifier for the datum;
117 * maybe 0 if no unique identifier is available
118 */
119static void
120process_dht_put_content (void *cls,
121 const GNUNET_HashCode * key,
122 size_t size,
123 const void *data,
124 enum GNUNET_BLOCK_Type type,
125 uint32_t priority,
126 uint32_t anonymity,
127 struct GNUNET_TIME_Absolute
128 expiration, uint64_t uid)
129{
130 static unsigned int counter;
131 static GNUNET_HashCode last_vhash;
132 static GNUNET_HashCode vhash;
133
134 if (key == NULL)
135 {
136 dht_qe = NULL;
137 consider_dht_put_gathering (cls);
138 return;
139 }
140 /* slightly funky code to estimate the total number of values with zero
141 anonymity from the maximum observed length of a monotonically increasing
142 sequence of hashes over the contents */
143 GNUNET_CRYPTO_hash (data, size, &vhash);
144 if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
145 {
146 if (zero_anonymity_count_estimate > 0)
147 zero_anonymity_count_estimate /= 2;
148 counter = 0;
149 }
150 last_vhash = vhash;
151 if (counter < 31)
152 counter++;
153 if (zero_anonymity_count_estimate < (1 << counter))
154 zero_anonymity_count_estimate = (1 << counter);
155#if DEBUG_FS
156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
157 "Retrieved block `%s' of type %u for DHT PUT\n",
158 GNUNET_h2s (key),
159 type);
160#endif
161 GNUNET_DHT_put (dht_handle,
162 key,
163 DEFAULT_PUT_REPLICATION,
164 GNUNET_DHT_RO_NONE,
165 type,
166 size,
167 data,
168 expiration,
169 GNUNET_TIME_UNIT_FOREVER_REL,
170 &dht_put_continuation,
171 cls);
172}
173
174
175
176/**
177 * Task that is run periodically to obtain blocks for DHT PUTs.
178 *
179 * @param cls type of blocks to gather
180 * @param tc scheduler context (unused)
181 */
182static void
183gather_dht_put_blocks (void *cls,
184 const struct GNUNET_SCHEDULER_TaskContext *tc)
185{
186 dht_task = GNUNET_SCHEDULER_NO_TASK;
187 if (dsh != NULL)
188 {
189 if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
190 dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
191 dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
192 GNUNET_TIME_UNIT_FOREVER_REL,
193 dht_put_type++,
194 &process_dht_put_content, NULL);
195 GNUNET_assert (dht_qe != NULL);
196 }
197}