Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Fix consensus VoteOther case (#1834)
Browse files Browse the repository at this point in the history
* Refactor yac gate test with subject

* Add test case for VoteOther

* Fix VoteOther struct and handling

Signed-off-by: Andrei Lebedev <[email protected]>
  • Loading branch information
lebdron authored Nov 9, 2018
1 parent 3da3c4d commit e37760e
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 99 deletions.
7 changes: 5 additions & 2 deletions irohad/consensus/gate_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
#define CONSENSUS_GATE_OBJECT_HPP

#include <boost/variant.hpp>

#include "consensus/round.hpp"
#include "cryptography/hash.hpp"
#include "cryptography/public_key.hpp"
#include "interfaces/common_objects/types.hpp"

namespace shared_model {
namespace interface {
Expand All @@ -27,7 +29,8 @@ namespace iroha {

/// Network votes for another pair and round
struct VoteOther {
std::shared_ptr<shared_model::interface::Block> block;
shared_model::interface::types::PublicKeyCollectionType public_keys;
shared_model::interface::types::HashType hash;
consensus::Round round;
};

Expand Down
11 changes: 10 additions & 1 deletion irohad/consensus/yac/impl/yac_gate_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "consensus/yac/impl/yac_gate_impl.hpp"

#include <boost/range/adaptor/transformed.hpp>
#include "common/visitor.hpp"
#include "consensus/yac/cluster_order.hpp"
#include "consensus/yac/messages.hpp"
Expand Down Expand Up @@ -119,8 +120,16 @@ namespace iroha {
PairValid{block, current_hash_.vote_round});
}
log_->info("Voted for another block, waiting for sync");
auto public_keys = boost::copy_range<
shared_model::interface::types::PublicKeyCollectionType>(
msg.votes | boost::adaptors::transformed([](auto &vote) {
return vote.signature->publicKey();
}));
auto model_hash = hash_provider_->toModelHash(hash);
return rxcpp::observable<>::just<GateObject>(
VoteOther{current_block_.value(), current_hash_.vote_round});
VoteOther{std::move(public_keys),
std::move(model_hash),
current_hash_.vote_round});
}

rxcpp::observable<YacGateImpl::GateObject> YacGateImpl::handleReject(
Expand Down
36 changes: 13 additions & 23 deletions irohad/synchronizer/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ namespace iroha {
log_->info("processing consensus outcome");
visit_in_place(
object,
[this](const consensus::PairValid &msg) {
this->processNext(msg.block, msg.round);
},
[this](const consensus::PairValid &msg) { this->processNext(msg); },
[this](const consensus::VoteOther &msg) {
this->processDifferent(msg.block, msg.round);
this->processDifferent(msg);
},
[this](const consensus::ProposalReject &msg) {
notifier_.get_subscriber().on_next(SynchronizationEvent{
Expand All @@ -63,17 +61,13 @@ namespace iroha {
}

SynchronizationEvent SynchronizerImpl::downloadMissingBlocks(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round,
const consensus::VoteOther &msg,
std::unique_ptr<ametsuchi::MutableStorage> storage) {
auto hash = commit_message->hash();

// while blocks are not loaded and not committed
while (true) {
// TODO andrei 17.10.18 IR-1763 Add delay strategy for loading blocks
for (const auto &peer_signature : commit_message->signatures()) {
auto network_chain = block_loader_->retrieveBlocks(
shared_model::crypto::PublicKey(peer_signature.publicKey()));
for (const auto &public_key : msg.public_keys) {
auto network_chain = block_loader_->retrieveBlocks(public_key);

std::vector<std::shared_ptr<shared_model::interface::Block>> blocks;
network_chain.as_blocking().subscribe(
Expand All @@ -86,11 +80,11 @@ namespace iroha {
auto chain =
rxcpp::observable<>::iterate(blocks, rxcpp::identity_immediate());

if (blocks.back()->hash() == hash
if (blocks.back()->hash() == msg.hash
and validator_->validateAndApply(chain, *storage)) {
mutable_factory_->commit(std::move(storage));

return {chain, SynchronizationOutcomeType::kCommit, round};
return {chain, SynchronizationOutcomeType::kCommit, msg.round};
}
}
}
Expand All @@ -111,27 +105,23 @@ namespace iroha {
->value)};
}

void SynchronizerImpl::processNext(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round) {
void SynchronizerImpl::processNext(const consensus::PairValid &msg) {
log_->info("at handleNext");
auto opt_storage = getStorage();
if (opt_storage == boost::none) {
return;
}
std::unique_ptr<ametsuchi::MutableStorage> storage =
std::move(opt_storage.value());
storage->apply(*commit_message);
storage->apply(*msg.block);
mutable_factory_->commit(std::move(storage));
notifier_.get_subscriber().on_next(
SynchronizationEvent{rxcpp::observable<>::just(commit_message),
SynchronizationEvent{rxcpp::observable<>::just(msg.block),
SynchronizationOutcomeType::kCommit,
round});
msg.round});
}

void SynchronizerImpl::processDifferent(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round) {
void SynchronizerImpl::processDifferent(const consensus::VoteOther &msg) {
log_->info("at handleDifferent");
auto opt_storage = getStorage();
if (opt_storage == boost::none) {
Expand All @@ -140,7 +130,7 @@ namespace iroha {
std::unique_ptr<ametsuchi::MutableStorage> storage =
std::move(opt_storage.value());
SynchronizationEvent result =
downloadMissingBlocks(commit_message, round, std::move(storage));
downloadMissingBlocks(msg, std::move(storage));
notifier_.get_subscriber().on_next(result);
}

Expand Down
14 changes: 5 additions & 9 deletions irohad/synchronizer/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
#ifndef IROHA_SYNCHRONIZER_IMPL_HPP
#define IROHA_SYNCHRONIZER_IMPL_HPP

#include "synchronizer/synchronizer.hpp"

#include "ametsuchi/mutable_factory.hpp"
#include "logger/logger.hpp"
#include "network/block_loader.hpp"
#include "network/consensus_gate.hpp"
#include "synchronizer/synchronizer.hpp"
#include "validation/chain_validator.hpp"

namespace iroha {
Expand All @@ -35,16 +36,11 @@ namespace iroha {
* apply the missing blocks
*/
SynchronizationEvent downloadMissingBlocks(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round,
const consensus::VoteOther &msg,
std::unique_ptr<ametsuchi::MutableStorage> storage);

void processNext(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round);
void processDifferent(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round);
void processNext(const consensus::PairValid &msg);
void processDifferent(const consensus::VoteOther &msg);

boost::optional<std::unique_ptr<ametsuchi::MutableStorage>> getStorage();

Expand Down
130 changes: 95 additions & 35 deletions test/module/irohad/consensus/yac/yac_gate_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

#include "consensus/yac/impl/yac_gate_impl.hpp"

#include <memory>

#include <rxcpp/rx.hpp>
#include "consensus/consensus_block_cache.hpp"
#include "consensus/yac/impl/yac_gate_impl.hpp"
#include "consensus/yac/storage/yac_proposal_storage.hpp"
#include "cryptography/crypto_provider/crypto_defaults.hpp"
#include "framework/specified_visitor.hpp"
Expand All @@ -22,7 +23,6 @@ using namespace iroha::network;
using namespace iroha::simulator;
using namespace framework::test_subscriber;
using namespace shared_model::crypto;
using namespace std;
using iroha::consensus::ConsensusResultCache;

using ::testing::_;
Expand Down Expand Up @@ -68,18 +68,21 @@ class YacGateTest : public ::testing::Test {
message.hash = expected_hash;
message.signature = signature;
commit_message = CommitMessage({message});
expected_commit = rxcpp::observable<>::just(Answer(commit_message));
expected_commit = commit_message;

hash_gate = make_unique<MockHashGate>();
peer_orderer = make_unique<MockYacPeerOrderer>();
hash_provider = make_shared<MockYacHashProvider>();
block_creator = make_shared<MockBlockCreator>();
block_cache = make_shared<ConsensusResultCache>();
}
auto hash_gate_ptr = std::make_unique<MockHashGate>();
hash_gate = hash_gate_ptr.get();
auto peer_orderer_ptr = std::make_unique<MockYacPeerOrderer>();
peer_orderer = peer_orderer_ptr.get();
hash_provider = std::make_shared<MockYacHashProvider>();
block_creator = std::make_shared<MockBlockCreator>();
block_cache = std::make_shared<ConsensusResultCache>();

ON_CALL(*block_creator, on_block())
.WillByDefault(Return(block_notifier.get_observable()));

void init() {
gate = std::make_shared<YacGateImpl>(std::move(hash_gate),
std::move(peer_orderer),
gate = std::make_shared<YacGateImpl>(std::move(hash_gate_ptr),
std::move(peer_orderer_ptr),
hash_provider,
block_creator,
block_cache);
Expand All @@ -92,15 +95,18 @@ class YacGateTest : public ::testing::Test {
std::shared_ptr<shared_model::interface::Block> expected_block;
VoteMessage message;
CommitMessage commit_message;
rxcpp::observable<Answer> expected_commit;
Answer expected_commit{commit_message};
rxcpp::subjects::subject<std::shared_ptr<shared_model::interface::Block>>
block_notifier;
rxcpp::subjects::subject<Answer> outcome_notifier;

unique_ptr<MockHashGate> hash_gate;
unique_ptr<MockYacPeerOrderer> peer_orderer;
shared_ptr<MockYacHashProvider> hash_provider;
shared_ptr<MockBlockCreator> block_creator;
shared_ptr<ConsensusResultCache> block_cache;
MockHashGate *hash_gate;
MockYacPeerOrderer *peer_orderer;
std::shared_ptr<MockYacHashProvider> hash_provider;
std::shared_ptr<MockBlockCreator> block_creator;
std::shared_ptr<ConsensusResultCache> block_cache;

shared_ptr<YacGateImpl> gate;
std::shared_ptr<YacGateImpl> gate;

protected:
YacGateTest() : commit_message(std::vector<VoteMessage>{}) {}
Expand All @@ -115,7 +121,8 @@ TEST_F(YacGateTest, YacGateSubscriptionTest) {
// yac consensus
EXPECT_CALL(*hash_gate, vote(expected_hash, _)).Times(1);

EXPECT_CALL(*hash_gate, onOutcome()).WillOnce(Return(expected_commit));
EXPECT_CALL(*hash_gate, onOutcome())
.WillOnce(Return(outcome_notifier.get_observable()));

// generate order of peers
EXPECT_CALL(*peer_orderer, getOrdering(_))
Expand All @@ -124,11 +131,7 @@ TEST_F(YacGateTest, YacGateSubscriptionTest) {
// make hash from block
EXPECT_CALL(*hash_provider, makeHash(_)).WillOnce(Return(expected_hash));

// make blocks
EXPECT_CALL(*block_creator, on_block())
.WillOnce(Return(rxcpp::observable<>::just(expected_block)));

init();
block_notifier.get_subscriber().on_next(expected_block);

// verify that block we voted for is in the cache
auto cache_block = block_cache->get();
Expand All @@ -145,6 +148,8 @@ TEST_F(YacGateTest, YacGateSubscriptionTest) {
ASSERT_EQ(block, cache_block);
});

outcome_notifier.get_subscriber().on_next(expected_commit);

ASSERT_TRUE(gate_wrapper.validate());
}

Expand All @@ -165,11 +170,7 @@ TEST_F(YacGateTest, YacGateSubscribtionTestFailCase) {
// make hash from block
EXPECT_CALL(*hash_provider, makeHash(_)).WillOnce(Return(expected_hash));

// make blocks
EXPECT_CALL(*block_creator, on_block())
.WillOnce(Return(rxcpp::observable<>::just(expected_block)));

init();
block_notifier.get_subscriber().on_next(expected_block);
}

/**
Expand All @@ -179,15 +180,74 @@ TEST_F(YacGateTest, YacGateSubscribtionTestFailCase) {
*/
TEST_F(YacGateTest, AgreementOnNone) {
EXPECT_CALL(*hash_gate, vote(_, _)).Times(1);
EXPECT_CALL(*block_creator, on_block())
.WillOnce(Return(rxcpp::observable<>::empty<
std::shared_ptr<shared_model::interface::Block>>()));

EXPECT_CALL(*peer_orderer, getOrdering(_))
.WillOnce(Return(ClusterOrdering::create({mk_peer("fake_node")})));

init();

ASSERT_EQ(block_cache->get(), nullptr);

gate->vote(boost::none, boost::none, {});

ASSERT_EQ(block_cache->get(), nullptr);
}

/**
* @given yac gate
* @when voting for one block @and receiving another
* @then yac gate will emit the data of block, for which consensus voted
*/
TEST_F(YacGateTest, DifferentCommit) {
// make hash from block
EXPECT_CALL(*hash_provider, makeHash(_)).WillOnce(Return(expected_hash));

// generate order of peers
EXPECT_CALL(*peer_orderer, getOrdering(_))
.WillOnce(Return(ClusterOrdering::create({mk_peer("fake_node")})));

EXPECT_CALL(*hash_gate, vote(expected_hash, _)).Times(1);

block_notifier.get_subscriber().on_next(expected_block);

// create another block, which will be "received", and generate a commit
// message with it
decltype(expected_block) actual_block = std::make_shared<MockBlock>();
Hash actual_hash("actual_hash");
PublicKey actual_pubkey("actual_pubkey");
auto signature = std::make_shared<MockSignature>();
EXPECT_CALL(*signature, publicKey())
.WillRepeatedly(ReturnRefOfCopy(actual_pubkey));

message.hash =
YacHash(iroha::consensus::Round{1, 1}, "actual_proposal", "actual_block");
message.signature = signature;
commit_message = CommitMessage({message});
expected_commit = commit_message;

// yac
EXPECT_CALL(*hash_gate, onOutcome())
.WillOnce(Return(outcome_notifier.get_observable()));

// convert yac hash to model hash
EXPECT_CALL(*hash_provider, toModelHash(message.hash))
.WillOnce(Return(actual_hash));

// verify that block we voted for is in the cache
auto cache_block = block_cache->get();
ASSERT_EQ(cache_block, expected_block);

// verify that yac gate emit expected block
auto gate_wrapper = make_test_subscriber<CallExact>(gate->onOutcome(), 1);
gate_wrapper.subscribe([actual_hash, actual_pubkey](auto outcome) {
auto concrete_outcome = boost::get<iroha::consensus::VoteOther>(outcome);
auto public_keys = concrete_outcome.public_keys;
auto hash = concrete_outcome.hash;

ASSERT_EQ(1, public_keys.size());
ASSERT_EQ(actual_pubkey, public_keys.front());
ASSERT_EQ(hash, actual_hash);
});

outcome_notifier.get_subscriber().on_next(expected_commit);

ASSERT_TRUE(gate_wrapper.validate());
}
Loading

0 comments on commit e37760e

Please sign in to comment.