aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-31 21:23:23 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-31 21:23:23 +0000
commita78990b412db2c0ead2da8061c4f454f068991d1 (patch)
tree2e87adae62fd1ca3cd5a9f4c69248986f78bb106 /src/fs/gnunet-service-fs_pr.c
parent406c7d2d2d126c994a1fff13470b1f96439c6f9d (diff)
downloadgnunet-a78990b412db2c0ead2da8061c4f454f068991d1.tar.gz
gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.zip
converting FS to new MQ-based core API
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c93
1 files changed, 42 insertions, 51 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index cd58992c1..f8a7b61f0 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -512,21 +512,17 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
512 512
513/** 513/**
514 * Generate the message corresponding to the given pending request for 514 * Generate the message corresponding to the given pending request for
515 * transmission to other peers (or at least determine its size). 515 * transmission to other peers.
516 * 516 *
517 * @param pr request to generate the message for 517 * @param pr request to generate the message for
518 * @param buf_size number of bytes available in @a buf 518 * @return envelope with the request message
519 * @param buf where to copy the message (can be NULL)
520 * @return number of bytes needed (if `>` @a buf_size) or used
521 */ 519 */
522size_t 520struct GNUNET_MQ_Envelope *
523GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, 521GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
524 size_t buf_size, void *buf)
525{ 522{
526 char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE]; 523 struct GNUNET_MQ_Envelope *env;
527 struct GetMessage *gm; 524 struct GetMessage *gm;
528 struct GNUNET_PeerIdentity *ext; 525 struct GNUNET_PeerIdentity *ext;
529 size_t msize;
530 unsigned int k; 526 unsigned int k;
531 uint32_t bm; 527 uint32_t bm;
532 uint32_t prio; 528 uint32_t prio;
@@ -535,11 +531,10 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
535 int64_t ttl; 531 int64_t ttl;
536 int do_route; 532 int do_route;
537 533
538 if (buf_size > 0) 534 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
539 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 535 "Building request message for `%s' of type %d\n",
540 "Building request message for `%s' of type %d\n", 536 GNUNET_h2s (&pr->public_data.query),
541 GNUNET_h2s (&pr->public_data.query), 537 pr->public_data.type);
542 pr->public_data.type);
543 k = 0; 538 k = 0;
544 bm = 0; 539 bm = 0;
545 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY)); 540 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
@@ -559,13 +554,9 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
559 k++; 554 k++;
560 } 555 }
561 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf); 556 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
562 msize = sizeof (struct GetMessage) + bf_size + k * sizeof (struct GNUNET_PeerIdentity); 557 env = GNUNET_MQ_msg_extra (gm,
563 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); 558 bf_size + k * sizeof (struct GNUNET_PeerIdentity),
564 if (buf_size < msize) 559 GNUNET_MESSAGE_TYPE_FS_GET);
565 return msize;
566 gm = (struct GetMessage *) lbuf;
567 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
568 gm->header.size = htons (msize);
569 gm->type = htonl (pr->public_data.type); 560 gm->type = htonl (pr->public_data.type);
570 if (do_route) 561 if (do_route)
571 prio = 562 prio =
@@ -585,7 +576,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
585 gm->query = pr->public_data.query; 576 gm->query = pr->public_data.query;
586 ext = (struct GNUNET_PeerIdentity *) &gm[1]; 577 ext = (struct GNUNET_PeerIdentity *) &gm[1];
587 k = 0; 578 k = 0;
588 if (!do_route) 579 if (! do_route)
589 GNUNET_PEER_resolve (pr->sender_pid, 580 GNUNET_PEER_resolve (pr->sender_pid,
590 &ext[k++]); 581 &ext[k++]);
591 if (NULL != pr->public_data.target) 582 if (NULL != pr->public_data.target)
@@ -595,8 +586,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
595 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, 586 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
596 (char *) &ext[k], 587 (char *) &ext[k],
597 bf_size)); 588 bf_size));
598 GNUNET_memcpy (buf, gm, msize); 589 return env;
599 return msize;
600} 590}
601 591
602 592
@@ -1699,18 +1689,14 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1699 * this content and possibly passes it on (to local clients or other 1689 * this content and possibly passes it on (to local clients or other
1700 * peers). Does NOT perform migration (content caching at this peer). 1690 * peers). Does NOT perform migration (content caching at this peer).
1701 * 1691 *
1702 * @param cp the other peer involved (sender or receiver, NULL 1692 * @param cls the other peer involved
1703 * for loopback messages where we are both sender and receiver) 1693 * @param put the actual message
1704 * @param message the actual message
1705 * @return #GNUNET_OK if the message was well-formed,
1706 * #GNUNET_SYSERR if the message was malformed (close connection,
1707 * do not cache under any circumstances)
1708 */ 1694 */
1709int 1695void
1710GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, 1696handle_p2p_put (void *cls,
1711 const struct GNUNET_MessageHeader *message) 1697 const struct PutMessage *put)
1712{ 1698{
1713 const struct PutMessage *put; 1699 struct GSF_ConnectedPeer *cp = cls;
1714 uint16_t msize; 1700 uint16_t msize;
1715 size_t dsize; 1701 size_t dsize;
1716 enum GNUNET_BLOCK_Type type; 1702 enum GNUNET_BLOCK_Type type;
@@ -1721,21 +1707,17 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1721 double putl; 1707 double putl;
1722 struct PutMigrationContext *pmc; 1708 struct PutMigrationContext *pmc;
1723 1709
1724 msize = ntohs (message->size); 1710 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1725 if (msize < sizeof (struct PutMessage)) 1711 "Received P2P PUT from %s\n",
1726 { 1712 GNUNET_i2s (GSF_get_peer_performance_data_ (cp)->peer));
1727 GNUNET_break_op (0); 1713 GSF_cover_content_count++;
1728 return GNUNET_SYSERR; 1714 msize = ntohs (put->header.size);
1729 }
1730 put = (const struct PutMessage *) message;
1731 dsize = msize - sizeof (struct PutMessage); 1715 dsize = msize - sizeof (struct PutMessage);
1732 type = ntohl (put->type); 1716 type = ntohl (put->type);
1733 expiration = GNUNET_TIME_absolute_ntoh (put->expiration); 1717 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
1734 /* do not allow migrated content to live longer than 1 year */ 1718 /* do not allow migrated content to live longer than 1 year */
1735 expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS), 1719 expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
1736 expiration); 1720 expiration);
1737 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1738 return GNUNET_SYSERR;
1739 if (GNUNET_OK != 1721 if (GNUNET_OK !=
1740 GNUNET_BLOCK_get_key (GSF_block_ctx, 1722 GNUNET_BLOCK_get_key (GSF_block_ctx,
1741 type, 1723 type,
@@ -1744,7 +1726,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1744 &query)) 1726 &query))
1745 { 1727 {
1746 GNUNET_break_op (0); 1728 GNUNET_break_op (0);
1747 return GNUNET_SYSERR; 1729 return;
1748 } 1730 }
1749 GNUNET_STATISTICS_update (GSF_stats, 1731 GNUNET_STATISTICS_update (GSF_stats,
1750 gettext_noop ("# GAP PUT messages received"), 1732 gettext_noop ("# GAP PUT messages received"),
@@ -1786,11 +1768,19 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1786 GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid, 1768 GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
1787 &pmc->origin); 1769 &pmc->origin);
1788 if (NULL == 1770 if (NULL ==
1789 GNUNET_DATASTORE_put (GSF_dsh, 0, &query, dsize, &put[1], type, 1771 GNUNET_DATASTORE_put (GSF_dsh,
1790 prq.priority, 1 /* anonymity */ , 1772 0,
1773 &query,
1774 dsize,
1775 &put[1],
1776 type,
1777 prq.priority,
1778 1 /* anonymity */ ,
1791 0 /* replication */ , 1779 0 /* replication */ ,
1792 expiration, 1 + prq.priority, MAX_DATASTORE_QUEUE, 1780 expiration, 1 + prq.priority,
1793 &put_migration_continuation, pmc)) 1781 MAX_DATASTORE_QUEUE,
1782 &put_migration_continuation,
1783 pmc))
1794 { 1784 {
1795 put_migration_continuation (pmc, 1785 put_migration_continuation (pmc,
1796 GNUNET_SYSERR, 1786 GNUNET_SYSERR,
@@ -1802,7 +1792,8 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1802 { 1792 {
1803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1793 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1804 "Choosing not to keep content `%s' (%d/%d)\n", 1794 "Choosing not to keep content `%s' (%d/%d)\n",
1805 GNUNET_h2s (&query), active_to_migration, 1795 GNUNET_h2s (&query),
1796 active_to_migration,
1806 test_put_load_too_high (prq.priority)); 1797 test_put_load_too_high (prq.priority));
1807 } 1798 }
1808 putl = GNUNET_LOAD_get_load (datastore_put_load); 1799 putl = GNUNET_LOAD_get_load (datastore_put_load);
@@ -1826,9 +1817,9 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1826 putl, 1817 putl,
1827 active_to_migration, 1818 active_to_migration,
1828 (GNUNET_NO == prq.request_found)); 1819 (GNUNET_NO == prq.request_found));
1829 GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (block_time)); 1820 GSF_block_peer_migration_ (cp,
1821 GNUNET_TIME_relative_to_absolute (block_time));
1830 } 1822 }
1831 return GNUNET_OK;
1832} 1823}
1833 1824
1834 1825