Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Fix itf fake peers on demand ordering test #2067

Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,56 @@ namespace integration_framework {
// Hint: such calls would precede the derived class construction.
fake_peer_wptr_ = fake_peer;
log_ = std::move(log);
std::weak_ptr<Behaviour> weak_this = shared_from_this();
// subscribe for all messages
subscriptions_.emplace_back(
getFakePeer().getMstStatesObservable().subscribe(
[this](const auto &message) {
this->processMstMessage(message);
[weak_this,
weak_fake_peer = fake_peer_wptr_](const auto &message) {
auto me_alive = weak_this.lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code of locking and checking is duplicated 5 times. I think better to refactor it.

auto fake_peer_is_alive = weak_fake_peer.lock();
if (me_alive and fake_peer_is_alive) {
me_alive->processMstMessage(message);
}
}));
subscriptions_.emplace_back(
getFakePeer().getYacStatesObservable().subscribe(
[this](const auto &message) {
this->processYacMessage(message);
[weak_this,
weak_fake_peer = fake_peer_wptr_](const auto &message) {
auto me_alive = weak_this.lock();
auto fake_peer_is_alive = weak_fake_peer.lock();
if (me_alive and fake_peer_is_alive) {
me_alive->processYacMessage(message);
}
}));
subscriptions_.emplace_back(
getFakePeer().getOsBatchesObservable().subscribe(
[this](const auto &batch) { this->processOsBatch(batch); }));
[weak_this, weak_fake_peer = fake_peer_wptr_](const auto &batch) {
auto me_alive = weak_this.lock();
auto fake_peer_is_alive = weak_fake_peer.lock();
if (me_alive and fake_peer_is_alive) {
me_alive->processOsBatch(batch);
}
}));
subscriptions_.emplace_back(
getFakePeer().getOgProposalsObservable().subscribe(
[this](const auto &proposal) {
this->processOgProposal(proposal);
[weak_this,
weak_fake_peer = fake_peer_wptr_](const auto &proposal) {
auto me_alive = weak_this.lock();
auto fake_peer_is_alive = weak_fake_peer.lock();
if (me_alive and fake_peer_is_alive) {
me_alive->processOgProposal(proposal);
}
}));
subscriptions_.emplace_back(
getFakePeer().getBatchesObservable().subscribe(
[this](const auto &batches) {
this->processOrderingBatches(*batches);
[weak_this,
weak_fake_peer = fake_peer_wptr_](const auto &batches) {
auto me_alive = weak_this.lock();
auto fake_peer_is_alive = weak_fake_peer.lock();
if (me_alive and fake_peer_is_alive) {
me_alive->processOrderingBatches(*batches);
}
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace shared_model {
namespace integration_framework {
namespace fake_peer {

class Behaviour {
class Behaviour : public std::enable_shared_from_this<Behaviour> {
public:
virtual ~Behaviour();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ namespace integration_framework {
auto opt_proposal = proposal_storage->getProposal(request);
getLogger()->debug(
"Got an OnDemandOrderingService.GetProposal call for round {}, "
"{} returning a proposal.",
"{}returning a proposal.",
request.toString(),
opt_proposal ? "" : "NOT");
opt_proposal ? "" : "NOT ");
return opt_proposal;
}

Expand Down
8 changes: 7 additions & 1 deletion test/framework/integration_framework/fake_peer/fake_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ namespace integration_framework {
yac_transport_->subscribe(yac_network_notifier_);
}

FakePeer::~FakePeer() {
if (behaviour_) {
behaviour_->absolve();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I can see Behaviour class calls absolve by itself in its destructor. Could you explain why we should do it manually here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do it to ensure it will stop its operation on this peer. The destructor may not be called if the behaviour is managed by a shared pointer stored elsewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it will be called later anyway, what's the problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do not call absolve() here, a behaviour (if it overlives the peer) will try to lock its weak pointer and will not succeed. This line absolves its duties on fake peer exit.

}
}

FakePeer &FakePeer::initialize() {
BOOST_VERIFY_MSG(not initialized_, "Already initialized!");
// here comes the initialization of members requiring shared_from_this()
Expand Down Expand Up @@ -381,7 +387,7 @@ namespace integration_framework {

boost::optional<std::shared_ptr<const shared_model::interface::Proposal>>
FakePeer::sendProposalRequest(iroha::consensus::Round round,
std::chrono::milliseconds timeout) {
std::chrono::milliseconds timeout) const {
auto on_demand_os_transport =
iroha::ordering::transport::OnDemandOsClientGrpcFactory(
async_call_,
Expand Down
4 changes: 3 additions & 1 deletion test/framework/integration_framework/fake_peer/fake_peer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ namespace integration_framework {
std::shared_ptr<iroha::ametsuchi::TxPresenceCache> tx_presence_cache,
logger::LoggerManagerTreePtr log_manager);

~FakePeer();

/// Initialization method.
/// \attention Must be called prior to any other instance method (except
/// for constructor).
Expand Down Expand Up @@ -191,7 +193,7 @@ namespace integration_framework {
*/
boost::optional<std::shared_ptr<const shared_model::interface::Proposal>>
sendProposalRequest(iroha::consensus::Round round,
std::chrono::milliseconds timeout);
std::chrono::milliseconds timeout) const;

private:
using MstTransport = iroha::network::MstTransportGrpc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "framework/integration_framework/fake_peer/proposal_storage.hpp"

#include <atomic>
#include <mutex>

#include <boost/range/adaptor/indirected.hpp>
Expand All @@ -18,10 +19,11 @@ namespace integration_framework {
namespace fake_peer {

ProposalStorage::ProposalStorage()
: default_provider_([](auto &) { return boost::none; }),
proposal_factory_(
: proposal_factory_(
std::make_unique<shared_model::proto::ProtoProposalFactory<
shared_model::validation::DefaultProposalValidator>>()) {}
shared_model::validation::DefaultProposalValidator>>()) {
setDefaultProvider([](auto &) { return boost::none; });
}

OrderingProposalRequestResult ProposalStorage::getProposal(
const Round &round) {
Expand All @@ -46,7 +48,7 @@ namespace integration_framework {
}

// finally, use the defualt
return default_provider_(round);
return getDefaultProposal(round);
}

ProposalStorage &ProposalStorage::storeProposal(
Expand Down Expand Up @@ -114,5 +116,23 @@ namespace integration_framework {

}

ProposalStorage &ProposalStorage::setDefaultProvider(DefaultProvider provider) {
std::atomic_store_explicit(
&default_provider_,
std::make_shared<DefaultProvider>(std::move(provider)),
std::memory_order_release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to add some comments about why memory order here and below is important.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a general case of release-acquire model with nothing specific.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, what will happen if I omit memory_order arguments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that this optimization is excessive in tests. I believe no noticeable overhead will be added if the default memory order is restored.

return *this;
}

OrderingProposalRequestResult ProposalStorage::getDefaultProposal(
const Round &round) const {
auto default_provider = std::atomic_load_explicit(
&default_provider_, std::memory_order_acquire);
if (default_provider) {
return default_provider->operator()(round);
}
return {};
}

} // namespace fake_peer
} // namespace integration_framework
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,20 @@ namespace integration_framework {
/// with the transactions from internal storage will be returned
void addBatches(const BatchesCollection &batches);

ProposalStorage &setDefaultProvider(DefaultProvider provider);

private:
/// Create a proposal from pending transactions, if any.
boost::optional<std::unique_ptr<Proposal>> makeProposalFromPendingTxs(
shared_model::interface::types::HeightType height);

DefaultProvider default_provider_;
OrderingProposalRequestResult getDefaultProposal(
const Round &round) const;

std::unique_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory_;

std::shared_ptr<DefaultProvider> default_provider_;
std::map<Round, std::shared_ptr<const Proposal>> proposals_map_;
mutable std::shared_timed_mutex proposals_map_mutex_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,13 @@ namespace integration_framework {
return iroha_instance_->getIrohaInstance()->getConsensusGate()->onOutcome();
}

rxcpp::observable<iroha::synchronizer::SynchronizationEvent>
IntegrationTestFramework::getPcsOnCommitObservable() {
return iroha_instance_->getIrohaInstance()
->getPeerCommunicationService()
->on_commit();
}

IntegrationTestFramework &IntegrationTestFramework::getTxStatus(
const shared_model::crypto::Hash &hash,
std::function<void(const shared_model::proto::TransactionResponse &)>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,9 @@ namespace integration_framework {
rxcpp::observable<iroha::network::ConsensusGate::GateObject>
getYacOnCommitObservable();

rxcpp::observable<iroha::synchronizer::SynchronizationEvent>
getPcsOnCommitObservable();

/**
* Request next status of the transaction
* @param tx_hash is hash for filtering responses
Expand Down
9 changes: 8 additions & 1 deletion test/framework/integration_framework/iroha_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

using namespace std::chrono_literals;

static constexpr std::chrono::milliseconds kMstEmissionPeriod = 100ms;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess better to move default constants from itf to a separate header.


namespace integration_framework {

IrohaInstance::IrohaInstance(bool mst_support,
Expand All @@ -39,7 +41,12 @@ namespace integration_framework {
// amount of minutes in a day
mst_expiration_time_(std::chrono::minutes(24 * 60)),
opt_mst_gossip_params_(boost::make_optional(
mst_support, iroha::GossipPropagationStrategyParams{})),
mst_support,
[] {
iroha::GossipPropagationStrategyParams params;
params.emission_period = kMstEmissionPeriod;
return params;
}())),
max_rounds_delay_(0ms),
stale_stream_max_rounds_(2),
irohad_log_manager_(std::move(irohad_log_manager)),
Expand Down
Loading