diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-31 21:23:23 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-31 21:23:23 +0000 |
commit | a78990b412db2c0ead2da8061c4f454f068991d1 (patch) | |
tree | 2e87adae62fd1ca3cd5a9f4c69248986f78bb106 /src/fs/gnunet-service-fs_pr.c | |
parent | 406c7d2d2d126c994a1fff13470b1f96439c6f9d (diff) | |
download | gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.tar.gz gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.zip |
converting FS to new MQ-based core API
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r-- | src/fs/gnunet-service-fs_pr.c | 93 |
1 files changed, 42 insertions, 51 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index cd58992c1..f8a7b61f0 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c | |||
@@ -512,21 +512,17 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, | |||
512 | 512 | ||
513 | /** | 513 | /** |
514 | * Generate the message corresponding to the given pending request for | 514 | * Generate the message corresponding to the given pending request for |
515 | * transmission to other peers (or at least determine its size). | 515 | * transmission to other peers. |
516 | * | 516 | * |
517 | * @param pr request to generate the message for | 517 | * @param pr request to generate the message for |
518 | * @param buf_size number of bytes available in @a buf | 518 | * @return envelope with the request message |
519 | * @param buf where to copy the message (can be NULL) | ||
520 | * @return number of bytes needed (if `>` @a buf_size) or used | ||
521 | */ | 519 | */ |
522 | size_t | 520 | struct GNUNET_MQ_Envelope * |
523 | GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | 521 | GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr) |
524 | size_t buf_size, void *buf) | ||
525 | { | 522 | { |
526 | char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE]; | 523 | struct GNUNET_MQ_Envelope *env; |
527 | struct GetMessage *gm; | 524 | struct GetMessage *gm; |
528 | struct GNUNET_PeerIdentity *ext; | 525 | struct GNUNET_PeerIdentity *ext; |
529 | size_t msize; | ||
530 | unsigned int k; | 526 | unsigned int k; |
531 | uint32_t bm; | 527 | uint32_t bm; |
532 | uint32_t prio; | 528 | uint32_t prio; |
@@ -535,11 +531,10 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | |||
535 | int64_t ttl; | 531 | int64_t ttl; |
536 | int do_route; | 532 | int do_route; |
537 | 533 | ||
538 | if (buf_size > 0) | 534 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
539 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 535 | "Building request message for `%s' of type %d\n", |
540 | "Building request message for `%s' of type %d\n", | 536 | GNUNET_h2s (&pr->public_data.query), |
541 | GNUNET_h2s (&pr->public_data.query), | 537 | pr->public_data.type); |
542 | pr->public_data.type); | ||
543 | k = 0; | 538 | k = 0; |
544 | bm = 0; | 539 | bm = 0; |
545 | do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY)); | 540 | do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY)); |
@@ -559,13 +554,9 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | |||
559 | k++; | 554 | k++; |
560 | } | 555 | } |
561 | bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf); | 556 | bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf); |
562 | msize = sizeof (struct GetMessage) + bf_size + k * sizeof (struct GNUNET_PeerIdentity); | 557 | env = GNUNET_MQ_msg_extra (gm, |
563 | GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); | 558 | bf_size + k * sizeof (struct GNUNET_PeerIdentity), |
564 | if (buf_size < msize) | 559 | GNUNET_MESSAGE_TYPE_FS_GET); |
565 | return msize; | ||
566 | gm = (struct GetMessage *) lbuf; | ||
567 | gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); | ||
568 | gm->header.size = htons (msize); | ||
569 | gm->type = htonl (pr->public_data.type); | 560 | gm->type = htonl (pr->public_data.type); |
570 | if (do_route) | 561 | if (do_route) |
571 | prio = | 562 | prio = |
@@ -585,7 +576,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | |||
585 | gm->query = pr->public_data.query; | 576 | gm->query = pr->public_data.query; |
586 | ext = (struct GNUNET_PeerIdentity *) &gm[1]; | 577 | ext = (struct GNUNET_PeerIdentity *) &gm[1]; |
587 | k = 0; | 578 | k = 0; |
588 | if (!do_route) | 579 | if (! do_route) |
589 | GNUNET_PEER_resolve (pr->sender_pid, | 580 | GNUNET_PEER_resolve (pr->sender_pid, |
590 | &ext[k++]); | 581 | &ext[k++]); |
591 | if (NULL != pr->public_data.target) | 582 | if (NULL != pr->public_data.target) |
@@ -595,8 +586,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | |||
595 | GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, | 586 | GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, |
596 | (char *) &ext[k], | 587 | (char *) &ext[k], |
597 | bf_size)); | 588 | bf_size)); |
598 | GNUNET_memcpy (buf, gm, msize); | 589 | return env; |
599 | return msize; | ||
600 | } | 590 | } |
601 | 591 | ||
602 | 592 | ||
@@ -1699,18 +1689,14 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, | |||
1699 | * this content and possibly passes it on (to local clients or other | 1689 | * this content and possibly passes it on (to local clients or other |
1700 | * peers). Does NOT perform migration (content caching at this peer). | 1690 | * peers). Does NOT perform migration (content caching at this peer). |
1701 | * | 1691 | * |
1702 | * @param cp the other peer involved (sender or receiver, NULL | 1692 | * @param cls the other peer involved |
1703 | * for loopback messages where we are both sender and receiver) | 1693 | * @param put the actual message |
1704 | * @param message the actual message | ||
1705 | * @return #GNUNET_OK if the message was well-formed, | ||
1706 | * #GNUNET_SYSERR if the message was malformed (close connection, | ||
1707 | * do not cache under any circumstances) | ||
1708 | */ | 1694 | */ |
1709 | int | 1695 | void |
1710 | GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | 1696 | handle_p2p_put (void *cls, |
1711 | const struct GNUNET_MessageHeader *message) | 1697 | const struct PutMessage *put) |
1712 | { | 1698 | { |
1713 | const struct PutMessage *put; | 1699 | struct GSF_ConnectedPeer *cp = cls; |
1714 | uint16_t msize; | 1700 | uint16_t msize; |
1715 | size_t dsize; | 1701 | size_t dsize; |
1716 | enum GNUNET_BLOCK_Type type; | 1702 | enum GNUNET_BLOCK_Type type; |
@@ -1721,21 +1707,17 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
1721 | double putl; | 1707 | double putl; |
1722 | struct PutMigrationContext *pmc; | 1708 | struct PutMigrationContext *pmc; |
1723 | 1709 | ||
1724 | msize = ntohs (message->size); | 1710 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1725 | if (msize < sizeof (struct PutMessage)) | 1711 | "Received P2P PUT from %s\n", |
1726 | { | 1712 | GNUNET_i2s (GSF_get_peer_performance_data_ (cp)->peer)); |
1727 | GNUNET_break_op (0); | 1713 | GSF_cover_content_count++; |
1728 | return GNUNET_SYSERR; | 1714 | msize = ntohs (put->header.size); |
1729 | } | ||
1730 | put = (const struct PutMessage *) message; | ||
1731 | dsize = msize - sizeof (struct PutMessage); | 1715 | dsize = msize - sizeof (struct PutMessage); |
1732 | type = ntohl (put->type); | 1716 | type = ntohl (put->type); |
1733 | expiration = GNUNET_TIME_absolute_ntoh (put->expiration); | 1717 | expiration = GNUNET_TIME_absolute_ntoh (put->expiration); |
1734 | /* do not allow migrated content to live longer than 1 year */ | 1718 | /* do not allow migrated content to live longer than 1 year */ |
1735 | expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), | 1719 | expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), |
1736 | expiration); | 1720 | expiration); |
1737 | if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) | ||
1738 | return GNUNET_SYSERR; | ||
1739 | if (GNUNET_OK != | 1721 | if (GNUNET_OK != |
1740 | GNUNET_BLOCK_get_key (GSF_block_ctx, | 1722 | GNUNET_BLOCK_get_key (GSF_block_ctx, |
1741 | type, | 1723 | type, |
@@ -1744,7 +1726,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
1744 | &query)) | 1726 | &query)) |
1745 | { | 1727 | { |
1746 | GNUNET_break_op (0); | 1728 | GNUNET_break_op (0); |
1747 | return GNUNET_SYSERR; | 1729 | return; |
1748 | } | 1730 | } |
1749 | GNUNET_STATISTICS_update (GSF_stats, | 1731 | GNUNET_STATISTICS_update (GSF_stats, |
1750 | gettext_noop ("# GAP PUT messages received"), | 1732 | gettext_noop ("# GAP PUT messages received"), |
@@ -1786,11 +1768,19 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
1786 | GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid, | 1768 | GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid, |
1787 | &pmc->origin); | 1769 | &pmc->origin); |
1788 | if (NULL == | 1770 | if (NULL == |
1789 | GNUNET_DATASTORE_put (GSF_dsh, 0, &query, dsize, &put[1], type, | 1771 | GNUNET_DATASTORE_put (GSF_dsh, |
1790 | prq.priority, 1 /* anonymity */ , | 1772 | 0, |
1773 | &query, | ||
1774 | dsize, | ||
1775 | &put[1], | ||
1776 | type, | ||
1777 | prq.priority, | ||
1778 | 1 /* anonymity */ , | ||
1791 | 0 /* replication */ , | 1779 | 0 /* replication */ , |
1792 | expiration, 1 + prq.priority, MAX_DATASTORE_QUEUE, | 1780 | expiration, 1 + prq.priority, |
1793 | &put_migration_continuation, pmc)) | 1781 | MAX_DATASTORE_QUEUE, |
1782 | &put_migration_continuation, | ||
1783 | pmc)) | ||
1794 | { | 1784 | { |
1795 | put_migration_continuation (pmc, | 1785 | put_migration_continuation (pmc, |
1796 | GNUNET_SYSERR, | 1786 | GNUNET_SYSERR, |
@@ -1802,7 +1792,8 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
1802 | { | 1792 | { |
1803 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1793 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1804 | "Choosing not to keep content `%s' (%d/%d)\n", | 1794 | "Choosing not to keep content `%s' (%d/%d)\n", |
1805 | GNUNET_h2s (&query), active_to_migration, | 1795 | GNUNET_h2s (&query), |
1796 | active_to_migration, | ||
1806 | test_put_load_too_high (prq.priority)); | 1797 | test_put_load_too_high (prq.priority)); |
1807 | } | 1798 | } |
1808 | putl = GNUNET_LOAD_get_load (datastore_put_load); | 1799 | putl = GNUNET_LOAD_get_load (datastore_put_load); |
@@ -1826,9 +1817,9 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
1826 | putl, | 1817 | putl, |
1827 | active_to_migration, | 1818 | active_to_migration, |
1828 | (GNUNET_NO == prq.request_found)); | 1819 | (GNUNET_NO == prq.request_found)); |
1829 | GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (block_time)); | 1820 | GSF_block_peer_migration_ (cp, |
1821 | GNUNET_TIME_relative_to_absolute (block_time)); | ||
1830 | } | 1822 | } |
1831 | return GNUNET_OK; | ||
1832 | } | 1823 | } |
1833 | 1824 | ||
1834 | 1825 | ||