Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Fix itf fake peers on demand ordering test (#2067)
Browse files Browse the repository at this point in the history
* fix the fake peer ordering test
* reworked ordering test without sync
* reworked synchronization test without sync
* reworked mst test without sync
* absolve behaviour on FakePeer destruction
* fixed fake peer lifetime in behaviour subscriptions
* use the new ProposalStorage iface to add tx
* reduce timeouts
* reduce MST delay
* state alive check in behaviour subscriptions
* removed explicit memory order
* Locker class in Behaviour
* fixated proposal storage in fake peer
* synchronized subjects on_next()

Signed-off-by: Mikhail Boldyrev <[email protected]>
  • Loading branch information
MBoldyrev committed Apr 4, 2019
1 parent 6408767 commit 805447e
Show file tree
Hide file tree
Showing 21 changed files with 236 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,63 @@ namespace integration_framework {
// Hint: such calls would precede the derived class construction.
fake_peer_wptr_ = fake_peer;
log_ = std::move(log);

// Stores weak pointers. Tries to lock them at once.
class Locker {
std::weak_ptr<Behaviour> weak_behaviour_;
std::weak_ptr<FakePeer> weak_fake_peer_;

public:
using Protected =
std::tuple<std::shared_ptr<Behaviour>, std::shared_ptr<FakePeer>>;

Locker(std::weak_ptr<Behaviour> weak_behaviour,
std::weak_ptr<FakePeer> weak_fake_peer)
: weak_behaviour_(std::move(weak_behaviour)),
weak_fake_peer_(std::move(weak_fake_peer)) {}

boost::optional<Protected> protect() const {
Protected p{weak_behaviour_.lock(), weak_fake_peer_.lock()};
return boost::make_optional(std::get<0>(p) and std::get<1>(p), p);
}
};
Locker locker(shared_from_this(), fake_peer);

// subscribe for all messages
subscriptions_.emplace_back(
getFakePeer().getMstStatesObservable().subscribe(
[this](const auto &message) {
this->processMstMessage(message);
[this, locker](const auto &message) {
if (auto protector = locker.protect()) {
this->processMstMessage(message);
}
}));
subscriptions_.emplace_back(
getFakePeer().getYacStatesObservable().subscribe(
[this](const auto &message) {
this->processYacMessage(message);
[this, locker](const auto &message) {
if (auto protector = locker.protect()) {
this->processYacMessage(message);
}
}));
subscriptions_.emplace_back(
getFakePeer().getOsBatchesObservable().subscribe(
[this](const auto &batch) { this->processOsBatch(batch); }));
[this, locker](const auto &batch) {
if (auto protector = locker.protect()) {
this->processOsBatch(batch);
}
}));
subscriptions_.emplace_back(
getFakePeer().getOgProposalsObservable().subscribe(
[this](const auto &proposal) {
this->processOgProposal(proposal);
[this, locker](const auto &proposal) {
if (auto protector = locker.protect()) {
this->processOgProposal(proposal);
}
}));
subscriptions_.emplace_back(
getFakePeer().getBatchesObservable().subscribe(
[this](const auto &batches) {
this->processOrderingBatches(*batches);
[this, locker](const auto &batches) {
if (auto protector = locker.protect()) {
this->processOrderingBatches(*batches);
}
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace shared_model {
namespace integration_framework {
namespace fake_peer {

class Behaviour {
class Behaviour : public std::enable_shared_from_this<Behaviour> {
public:
virtual ~Behaviour();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,13 @@ namespace integration_framework {
OrderingProposalRequestResult
HonestBehaviour::processOrderingProposalRequest(
const OrderingProposalRequest &request) {
const auto proposal_storage = getFakePeer().getProposalStorage();
if (!proposal_storage) {
getLogger()->debug(
"Got an OnDemandOrderingService.GetProposal call for round {}, "
"but have no proposal storage! NOT returning a proposal.",
request.toString());
return boost::none;
}
auto opt_proposal = proposal_storage->getProposal(request);
auto opt_proposal =
getFakePeer().getProposalStorage().getProposal(request);
getLogger()->debug(
"Got an OnDemandOrderingService.GetProposal call for round {}, "
"{} returning a proposal.",
"{}returning a proposal.",
request.toString(),
opt_proposal ? "" : "NOT");
opt_proposal ? "" : "NOT ");
return opt_proposal;
}

Expand All @@ -92,16 +85,6 @@ namespace integration_framework {
return;
}
auto &fake_peer = getFakePeer();
auto proposal_storage = fake_peer.getProposalStorage();
if (!proposal_storage) {
getLogger()->debug(
"Got an OnDemandOrderingService.SendBatches call, but have no "
"proposal storage to store the incoming batches! Creating one.");
fake_peer.setProposalStorage(std::make_shared<ProposalStorage>());
proposal_storage = fake_peer.getProposalStorage();
BOOST_ASSERT_MSG(proposal_storage,
"Failed to create a proposal storage!");
}
getLogger()->debug(
"Got an OnDemandOrderingService.SendBatches call, storing the "
"following batches: {}",
Expand All @@ -111,7 +94,7 @@ namespace integration_framework {
}),
",\n"));

proposal_storage->addBatches(batches);
fake_peer.getProposalStorage().addBatches(batches);
}

} // namespace fake_peer
Expand Down
26 changes: 9 additions & 17 deletions test/framework/integration_framework/fake_peer/fake_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ namespace integration_framework {
yac_transport_->subscribe(yac_network_notifier_);
}

FakePeer::~FakePeer() {
if (behaviour_) {
behaviour_->absolve();
}
}

FakePeer &FakePeer::initialize() {
BOOST_VERIFY_MSG(not initialized_, "Already initialized!");
// here comes the initialization of members requiring shared_from_this()
Expand Down Expand Up @@ -178,22 +184,8 @@ namespace integration_framework {
return boost::none;
}

FakePeer &FakePeer::setProposalStorage(
std::shared_ptr<ProposalStorage> proposal_storage) {
proposal_storage_ = std::move(proposal_storage);
return *this;
}

FakePeer &FakePeer::removeProposalStorage() {
proposal_storage_.reset();
return *this;
}

boost::optional<ProposalStorage &> FakePeer::getProposalStorage() const {
if (proposal_storage_) {
return *proposal_storage_;
}
return boost::none;
ProposalStorage &FakePeer::getProposalStorage() {
return proposal_storage_;
}

void FakePeer::run() {
Expand Down Expand Up @@ -381,7 +373,7 @@ namespace integration_framework {

boost::optional<std::shared_ptr<const shared_model::interface::Proposal>>
FakePeer::sendProposalRequest(iroha::consensus::Round round,
std::chrono::milliseconds timeout) {
std::chrono::milliseconds timeout) const {
auto on_demand_os_transport =
iroha::ordering::transport::OnDemandOsClientGrpcFactory(
async_call_,
Expand Down
14 changes: 6 additions & 8 deletions test/framework/integration_framework/fake_peer/fake_peer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <boost/core/noncopyable.hpp>
#include <rxcpp/rx.hpp>
#include "framework/integration_framework/fake_peer/network/mst_message.hpp"
#include "framework/integration_framework/fake_peer/proposal_storage.hpp"
#include "framework/integration_framework/fake_peer/types.hpp"
#include "interfaces/iroha_internal/abstract_transport_factory.hpp"
#include "logger/logger_fwd.hpp"
Expand Down Expand Up @@ -68,6 +69,8 @@ namespace integration_framework {
std::shared_ptr<iroha::ametsuchi::TxPresenceCache> tx_presence_cache,
logger::LoggerManagerTreePtr log_manager);

~FakePeer();

/// Initialization method.
/// \attention Must be called prior to any other instance method (except
/// for constructor).
Expand All @@ -89,12 +92,7 @@ namespace integration_framework {
/// Get the block storage previously assigned to this peer, if any.
boost::optional<const BlockStorage &> getBlockStorage() const;

FakePeer &setProposalStorage(
std::shared_ptr<ProposalStorage> proposal_storage);

FakePeer &removeProposalStorage();

boost::optional<ProposalStorage &> getProposalStorage() const;
ProposalStorage &getProposalStorage();

/// Start the fake peer.
void run();
Expand Down Expand Up @@ -191,7 +189,7 @@ namespace integration_framework {
*/
boost::optional<std::shared_ptr<const shared_model::interface::Proposal>>
sendProposalRequest(iroha::consensus::Round round,
std::chrono::milliseconds timeout);
std::chrono::milliseconds timeout) const;

private:
using MstTransport = iroha::network::MstTransportGrpc;
Expand Down Expand Up @@ -252,7 +250,7 @@ namespace integration_framework {

std::shared_ptr<Behaviour> behaviour_;
std::shared_ptr<BlockStorage> block_storage_;
std::shared_ptr<ProposalStorage> proposal_storage_;
ProposalStorage proposal_storage_;
};

} // namespace fake_peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace integration_framework {
void MstNetworkNotifier::onNewState(
const shared_model::crypto::PublicKey &from,
const iroha::MstState &new_state) {
std::lock_guard<std::mutex> guard(mst_subject_mutex_);
mst_subject_.get_subscriber().on_next(
std::make_shared<MstMessage>(from, new_state));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include "network/mst_transport.hpp"

#include <mutex>

#include <rxcpp/rx.hpp>
#include "framework/integration_framework/fake_peer/network/mst_message.hpp"
#include "framework/integration_framework/fake_peer/types.hpp"
Expand All @@ -25,6 +27,7 @@ namespace integration_framework {

private:
rxcpp::subjects::subject<std::shared_ptr<MstMessage>> mst_subject_;
std::mutex mst_subject_mutex_;
};

} // namespace fake_peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace integration_framework {
: fake_peer_wptr_(fake_peer) {}

void OnDemandOsNetworkNotifier::onBatches(CollectionType batches) {
std::lock_guard<std::mutex> guard(batches_subject_mutex_);
batches_subject_.get_subscriber().on_next(
std::make_shared<BatchesCollection>(std::move(batches)));
}
Expand All @@ -26,7 +27,10 @@ namespace integration_framework {
std::shared_ptr<const OnDemandOsNetworkNotifier::ProposalType>>
OnDemandOsNetworkNotifier::onRequestProposal(
iroha::consensus::Round round) {
rounds_subject_.get_subscriber().on_next(round);
{
std::lock_guard<std::mutex> guard(rounds_subject_mutex_);
rounds_subject_.get_subscriber().on_next(round);
}
auto fake_peer = fake_peer_wptr_.lock();
BOOST_ASSERT_MSG(fake_peer, "Fake peer shared pointer is not set!");
const auto behaviour = fake_peer->getBehaviour();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include <rxcpp/rx.hpp>

#include <mutex>

#include "consensus/round.hpp"
#include "framework/integration_framework/fake_peer/types.hpp"
#include "ordering/on_demand_os_transport.hpp"
Expand All @@ -34,8 +36,10 @@ namespace integration_framework {
private:
std::weak_ptr<FakePeer> fake_peer_wptr_;
rxcpp::subjects::subject<iroha::consensus::Round> rounds_subject_;
std::mutex rounds_subject_mutex_;
rxcpp::subjects::subject<std::shared_ptr<BatchesCollection>>
batches_subject_;
std::mutex batches_subject_mutex_;
};

} // namespace fake_peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace integration_framework {

void OgNetworkNotifier::onProposal(
std::shared_ptr<shared_model::interface::Proposal> proposal) {
std::lock_guard<std::mutex> guard(proposals_subject_mutex_);
proposals_subject_.get_subscriber().on_next(std::move(proposal));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

#include "network/ordering_gate_transport.hpp"

#include <rxcpp/rx.hpp>
#include <mutex>

#include <rxcpp/rx.hpp>
#include "framework/integration_framework/fake_peer/types.hpp"

namespace integration_framework {
Expand All @@ -28,6 +29,7 @@ namespace integration_framework {
rxcpp::subjects::subject<
std::shared_ptr<shared_model::interface::Proposal>>
proposals_subject_;
std::mutex proposals_subject_mutex_;
};

} // namespace fake_peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace integration_framework {

void OsNetworkNotifier::onBatch(std::unique_ptr<TransactionBatch> batch) {
std::shared_ptr<TransactionBatch> batch_ptr = std::move(batch);
std::lock_guard<std::mutex> guard(batches_subject_mutex_);
batches_subject_.get_subscriber().on_next(std::move(batch_ptr));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include "network/ordering_service_transport.hpp"

#include <mutex>

#include <rxcpp/rx.hpp>
#include "framework/integration_framework/fake_peer/types.hpp"

Expand All @@ -28,6 +30,7 @@ namespace integration_framework {
rxcpp::subjects::subject<
std::shared_ptr<shared_model::interface::TransactionBatch>>
batches_subject_;
std::mutex batches_subject_mutex_;
};

} // namespace fake_peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace integration_framework {

void YacNetworkNotifier::onState(YacNetworkNotifier::StateMessage state) {
auto state_ptr = std::make_shared<const StateMessage>(std::move(state));
std::lock_guard<std::mutex> guard(votes_subject_mutex_);
votes_subject_.get_subscriber().on_next(state_ptr);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

#include "consensus/yac/transport/yac_network_interface.hpp"

#include <rxcpp/rx.hpp>
#include <mutex>

#include <rxcpp/rx.hpp>
#include "framework/integration_framework/fake_peer/types.hpp"

namespace integration_framework {
Expand All @@ -27,6 +28,7 @@ namespace integration_framework {
private:
rxcpp::subjects::subject<std::shared_ptr<const YacMessage>>
votes_subject_;
std::mutex votes_subject_mutex_;
};

} // namespace fake_peer
Expand Down
Loading

0 comments on commit 805447e

Please sign in to comment.