aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-02-10 12:59:38 +0000
committerChristian Grothoff <christian@grothoff.org>2011-02-10 12:59:38 +0000
commitf54389f6724ecbd39389d53fba7b3bfdb2e0a8eb (patch)
tree11a7156180b22e4eaf784f5b1e400261c00e3ef9 /src/fs/gnunet-service-fs_cp.c
parent3a39cd4cd22e345733ba225e7a4c0b6eecdad7df (diff)
downloadgnunet-f54389f6724ecbd39389d53fba7b3bfdb2e0a8eb.tar.gz
gnunet-f54389f6724ecbd39389d53fba7b3bfdb2e0a8eb.zip
stuff
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/fs/gnunet-service-fs_cp.c446
1 files changed, 436 insertions, 10 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 2361cd4fc..903549cb7 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -141,11 +141,6 @@ struct GSF_ConnectedPeer
141 uint64_t inc_preference; 141 uint64_t inc_preference;
142 142
143 /** 143 /**
144 * Trust rating for this peer
145 */
146 uint32_t trust;
147
148 /**
149 * Trust rating for this peer on disk. 144 * Trust rating for this peer on disk.
150 */ 145 */
151 uint32_t disk_trust; 146 uint32_t disk_trust;
@@ -249,6 +244,19 @@ update_atsi (struct GSF_ConnectedPeer *cp,
249 244
250 245
251/** 246/**
247 * Return the performance data record for the given peer
248 *
249 * @param cp peer to query
250 * @return performance data record for the peer
251 */
252struct GSF_PeerPerformanceData *
253GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
254{
255 return &cp->ppd;
256}
257
258
259/**
252 * Core is ready to transmit to a peer, get the message. 260 * Core is ready to transmit to a peer, get the message.
253 * 261 *
254 * @param cls the 'struct GSF_PeerTransmitHandle' of the message 262 * @param cls the 'struct GSF_PeerTransmitHandle' of the message
@@ -420,13 +428,95 @@ GSF_handle_p2p_migration_stop_ (void *cls,
420 GNUNET_break (0); 428 GNUNET_break (0);
421 return GNUNET_OK; 429 return GNUNET_OK;
422 } 430 }
423 cp->ppd.migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration)); 431 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
424 update_atsi (cp, atsi); 432 update_atsi (cp, atsi);
425 return GNUNET_OK; 433 return GNUNET_OK;
426} 434}
427 435
428 436
429/** 437/**
438 * Handle a reply to a pending request. Also called if a request
439 * expires (then with data == NULL). The handler may be called
440 * many times (depending on the request type), but will not be
441 * called during or after a call to GSF_pending_request_cancel
442 * and will also not be called anymore after a call signalling
443 * expiration.
444 *
445 * @param cls user-specified closure
446 * @param pr handle to the original pending request
447 * @param data response data, NULL on request expiration
448 * @param data_len number of bytes in data
449 */
450static void
451handle_p2p_reply (void *cls,
452 struct GSF_PendingRequest *pr,
453 const void *data,
454 size_t data_len)
455{
456#if SUPPORT_DELAYS
457 struct GNUNET_TIME_Relative art_delay;
458#endif
459
460 /* FIXME: adapt code fragments below to new API! */
461
462
463 /* reply will go over the network, check for cover traffic */
464 if ( (prq->anonymity_level > 1) &&
465 (cover_content_count < prq->anonymity_level - 1) )
466 {
467 /* insufficient cover traffic, skip */
468 GNUNET_STATISTICS_update (stats,
469 gettext_noop ("# replies suppressed due to lack of cover traffic"),
470 1,
471 GNUNET_NO);
472 return GNUNET_YES;
473 }
474 if (prq->anonymity_level > 1)
475 cover_content_count -= prq->anonymity_level - 1;
476
477
478 cp = pr->cp;
479#if DEBUG_FS
480 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
481 "Transmitting result for query `%s' to other peer (PID=%u)\n",
482 GNUNET_h2s (key),
483 (unsigned int) cp->pid);
484#endif
485 GNUNET_STATISTICS_update (stats,
486 gettext_noop ("# replies received for other peers"),
487 1,
488 GNUNET_NO);
489 msize = sizeof (struct PutMessage) + prq->size;
490 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
491 reply->cont = &transmit_reply_continuation;
492 reply->cont_cls = pr;
493#if SUPPORT_DELAYS
494 art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
495 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
496 TTL_DECREMENT));
497 reply->delay_until
498 = GNUNET_TIME_relative_to_absolute (art_delay);
499 GNUNET_STATISTICS_update (stats,
500 gettext_noop ("cummulative artificial delay introduced (ms)"),
501 art_delay.abs_value,
502 GNUNET_NO);
503#endif
504 reply->msize = msize;
505 reply->priority = UINT32_MAX; /* send replies first! */
506 pm = (struct PutMessage*) &reply[1];
507 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
508 pm->header.size = htons (msize);
509 pm->type = htonl (prq->type);
510 pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
511 memcpy (&pm[1], prq->data, prq->size);
512 add_to_pending_messages_for_peer (cp, reply, pr);
513
514
515}
516
517
518
519/**
430 * Handle P2P "QUERY" message. 520 * Handle P2P "QUERY" message.
431 * 521 *
432 * @param other the other peer involved (sender or receiver, NULL 522 * @param other the other peer involved (sender or receiver, NULL
@@ -438,9 +528,310 @@ struct GSF_PendingRequest *
438GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, 528GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
439 const struct GNUNET_MessageHeader *message) 529 const struct GNUNET_MessageHeader *message)
440{ 530{
531 /* FIXME: adapt old code to new API! */
532 struct PendingRequest *pr;
533 struct ConnectedPeer *cp;
534 struct ConnectedPeer *cps;
535 struct CheckDuplicateRequestClosure cdc;
536 struct GNUNET_TIME_Relative timeout;
537 uint16_t msize;
538 const struct GetMessage *gm;
539 unsigned int bits;
540 const GNUNET_HashCode *opt;
541 uint32_t bm;
542 size_t bfsize;
543 uint32_t ttl_decrement;
544 int32_t priority;
545 enum GNUNET_BLOCK_Type type;
546 int have_ns;
547
548 msize = ntohs(message->size);
549 if (msize < sizeof (struct GetMessage))
550 {
551 GNUNET_break_op (0);
552 return GNUNET_SYSERR;
553 }
554 gm = (const struct GetMessage*) message;
555#if DEBUG_FS
556 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
557 "Received request for `%s'\n",
558 GNUNET_h2s (&gm->query));
559#endif
560 type = ntohl (gm->type);
561 bm = ntohl (gm->hash_bitmap);
562 bits = 0;
563 while (bm > 0)
564 {
565 if (1 == (bm & 1))
566 bits++;
567 bm >>= 1;
568 }
569 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
570 {
571 GNUNET_break_op (0);
572 return GNUNET_SYSERR;
573 }
574 opt = (const GNUNET_HashCode*) &gm[1];
575 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
576 /* bfsize must be power of 2, check! */
577 if (0 != ( (bfsize - 1) & bfsize))
578 {
579 GNUNET_break_op (0);
580 return GNUNET_SYSERR;
581 }
582 cover_query_count++;
583 bm = ntohl (gm->hash_bitmap);
584 bits = 0;
585 cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
586 &other->hashPubKey);
587 if (NULL == cps)
588 {
589 /* peer must have just disconnected */
590 GNUNET_STATISTICS_update (stats,
591 gettext_noop ("# requests dropped due to initiator not being connected"),
592 1,
593 GNUNET_NO);
594 return GNUNET_SYSERR;
595 }
596 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
597 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
598 &opt[bits++]);
599 else
600 cp = cps;
601 if (cp == NULL)
602 {
603#if DEBUG_FS
604 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
605 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
606 "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
607 GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
608
609 else
610 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
611 "Failed to find peer `%4s' in connection set. Dropping query.\n",
612 GNUNET_i2s (other));
613#endif
614 GNUNET_STATISTICS_update (stats,
615 gettext_noop ("# requests dropped due to missing reverse route"),
616 1,
617 GNUNET_NO);
618 /* FIXME: try connect? */
619 return GNUNET_OK;
620 }
621 /* note that we can really only check load here since otherwise
622 peers could find out that we are overloaded by not being
623 disconnected after sending us a malformed query... */
624 priority = bound_priority (ntohl (gm->priority), cps);
625 if (priority < 0)
626 {
627#if DEBUG_FS
628 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629 "Dropping query from `%s', this peer is too busy.\n",
630 GNUNET_i2s (other));
631#endif
632 return GNUNET_OK;
633 }
634#if DEBUG_FS
635 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636 "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
637 GNUNET_h2s (&gm->query),
638 (unsigned int) type,
639 GNUNET_i2s (other),
640 (unsigned int) bm);
641#endif
642 have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
643 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
644 (have_ns ? sizeof(GNUNET_HashCode) : 0));
645 if (have_ns)
646 {
647 pr->namespace = (GNUNET_HashCode*) &pr[1];
648 memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
649 }
650 if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
651 (GNUNET_LOAD_get_average (cp->transmission_delay) >
652 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
653 {
654 /* don't have BW to send to peer, or would likely take longer than we have for it,
655 so at best indirect the query */
656 priority = 0;
657 pr->forward_only = GNUNET_YES;
658 }
659 pr->type = type;
660 pr->mingle = ntohl (gm->filter_mutator);
661 if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
662 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
663 pr->anonymity_level = 1;
664 pr->priority = (uint32_t) priority;
665 pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
666 pr->query = gm->query;
667 /* decrement ttl (always) */
668 ttl_decrement = 2 * TTL_DECREMENT +
669 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
670 TTL_DECREMENT);
671 if ( (pr->ttl < 0) &&
672 (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
673 {
674#if DEBUG_FS
675 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
677 GNUNET_i2s (other),
678 pr->ttl,
679 ttl_decrement);
680#endif
681 GNUNET_STATISTICS_update (stats,
682 gettext_noop ("# requests dropped due TTL underflow"),
683 1,
684 GNUNET_NO);
685 /* integer underflow => drop (should be very rare)! */
686 GNUNET_free (pr);
687 return GNUNET_OK;
688 }
689 pr->ttl -= ttl_decrement;
690 pr->start_time = GNUNET_TIME_absolute_get ();
691
692 /* get bloom filter */
693 if (bfsize > 0)
694 {
695 pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
696 bfsize,
697 BLOOMFILTER_K);
698 pr->bf_size = bfsize;
699 }
700 cdc.have = NULL;
701 cdc.pr = pr;
702 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
703 &gm->query,
704 &check_duplicate_request_peer,
705 &cdc);
706 if (cdc.have != NULL)
707 {
708 if (cdc.have->start_time.abs_value + cdc.have->ttl >=
709 pr->start_time.abs_value + pr->ttl)
710 {
711 /* existing request has higher TTL, drop new one! */
712 cdc.have->priority += pr->priority;
713 destroy_pending_request (pr);
714#if DEBUG_FS
715 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
716 "Have existing request with higher TTL, dropping new request.\n",
717 GNUNET_i2s (other));
718#endif
719 GNUNET_STATISTICS_update (stats,
720 gettext_noop ("# requests dropped due to higher-TTL request"),
721 1,
722 GNUNET_NO);
723 return GNUNET_OK;
724 }
725 else
726 {
727 /* existing request has lower TTL, drop old one! */
728 pr->priority += cdc.have->priority;
729 /* Possible optimization: if we have applicable pending
730 replies in 'cdc.have', we might want to move those over
731 (this is a really rare special-case, so it is not clear
732 that this would be worth it) */
733 destroy_pending_request (cdc.have);
734 /* keep processing 'pr'! */
735 }
736 }
737
738 pr->cp = cp;
739 GNUNET_break (GNUNET_OK ==
740 GNUNET_CONTAINER_multihashmap_put (query_request_map,
741 &gm->query,
742 pr,
743 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
744 GNUNET_break (GNUNET_OK ==
745 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
746 &other->hashPubKey,
747 pr,
748 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
749
750 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
751 pr,
752 pr->start_time.abs_value + pr->ttl);
753
754 GNUNET_STATISTICS_update (stats,
755 gettext_noop ("# P2P searches received"),
756 1,
757 GNUNET_NO);
758 GNUNET_STATISTICS_update (stats,
759 gettext_noop ("# P2P searches active"),
760 1,
761 GNUNET_NO);
762
763 /* calculate change in traffic preference */
764 cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
765 /* process locally */
766 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
767 type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
768 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
769 (pr->priority + 1));
770 if (GNUNET_YES != pr->forward_only)
771 {
772#if DEBUG_FS
773 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774 "Handing request for `%s' to datastore\n",
775 GNUNET_h2s (&gm->query));
776#endif
777 pr->qe = GNUNET_DATASTORE_get (dsh,
778 &gm->query,
779 type,
780 pr->priority + 1,
781 MAX_DATASTORE_QUEUE,
782 timeout,
783 &process_local_reply,
784 pr);
785 if (NULL == pr->qe)
786 {
787 GNUNET_STATISTICS_update (stats,
788 gettext_noop ("# requests dropped by datastore (queue length limit)"),
789 1,
790 GNUNET_NO);
791 }
792 }
793 else
794 {
795 GNUNET_STATISTICS_update (stats,
796 gettext_noop ("# requests forwarded due to high load"),
797 1,
798 GNUNET_NO);
799 }
800
801 /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
802 switch (pr->type)
803 {
804 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
805 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
806 /* only one result, wait for datastore */
807 if (GNUNET_YES != pr->forward_only)
808 {
809 GNUNET_STATISTICS_update (stats,
810 gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
811 1,
812 GNUNET_NO);
813 break;
814 }
815 default:
816 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
817 pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
818 pr);
819 }
820
821 /* make sure we don't track too many requests */
822 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
823 {
824 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
825 GNUNET_assert (pr != NULL);
826 destroy_pending_request (pr);
827 }
828 return GNUNET_OK;
829
830
831
441 // FIXME! 832 // FIXME!
442 // parse request 833 // parse request
443 // setup pending request 834 // setup pending request (use 'handle_p2p_reply')
444 // track pending request to cancel it on peer disconnect (!) 835 // track pending request to cancel it on peer disconnect (!)
445 // return it! 836 // return it!
446 // (actual planning & execution up to caller!) 837 // (actual planning & execution up to caller!)
@@ -815,6 +1206,41 @@ GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
815 1206
816 1207
817/** 1208/**
1209 * Ask a peer to stop migrating data to us until the given point
1210 * in time.
1211 *
1212 * @param cp peer to ask
1213 * @param block_time until when to block
1214 */
1215void
1216GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1217 struct GNUNET_TIME_Relative block_time)
1218{
1219 struct PendingMessage *pm;
1220 struct MigrationStopMessage *msm;
1221
1222 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value)
1223 return; /* already blocked */
1224 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
1225
1226 /* FIXME: adapt old code below to new API! */
1227 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
1228 sizeof (struct MigrationStopMessage));
1229 pm->msize = sizeof (struct MigrationStopMessage);
1230 pm->priority = UINT32_MAX;
1231 msm = (struct MigrationStopMessage*) &pm[1];
1232 msm->header.size = htons (sizeof (struct MigrationStopMessage));
1233 msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1234 msm->duration = GNUNET_TIME_relative_hton (block_time);
1235 add_to_pending_messages_for_peer (cp,
1236 pm,
1237 NULL);
1238}
1239
1240
1241
1242
1243/**
818 * Write host-trust information to a file - flush the buffer entry! 1244 * Write host-trust information to a file - flush the buffer entry!
819 * 1245 *
820 * @param cls closure, not used 1246 * @param cls closure, not used
@@ -965,9 +1391,9 @@ GSF_connected_peer_done_ ()
965 * @return GNUNET_YES (we should continue to iterate) 1391 * @return GNUNET_YES (we should continue to iterate)
966 */ 1392 */
967static int 1393static int
968clean_peer (void *cls, 1394clean_local_client (void *cls,
969 const GNUNET_HashCode * key, 1395 const GNUNET_HashCode * key,
970 void *value) 1396 void *value)
971{ 1397{
972 const struct GSF_LocalClient *lc = cls; 1398 const struct GSF_LocalClient *lc = cls;
973 struct GSF_ConnectedPeer *cp = value; 1399 struct GSF_ConnectedPeer *cp = value;