Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Fix consensus VoteOther case #1834

Merged
merged 6 commits into from
Nov 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions irohad/consensus/gate_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
#define CONSENSUS_GATE_OBJECT_HPP

#include <boost/variant.hpp>

#include "consensus/round.hpp"
#include "cryptography/hash.hpp"
#include "cryptography/public_key.hpp"
#include "interfaces/common_objects/types.hpp"

namespace shared_model {
namespace interface {
Expand All @@ -27,7 +29,8 @@ namespace iroha {

/// Network votes for another pair and round
struct VoteOther {
std::shared_ptr<shared_model::interface::Block> block;
shared_model::interface::types::PublicKeyCollectionType public_keys;
shared_model::interface::types::HashType hash;
consensus::Round round;
};

Expand Down
11 changes: 10 additions & 1 deletion irohad/consensus/yac/impl/yac_gate_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "consensus/yac/impl/yac_gate_impl.hpp"

#include <boost/range/adaptor/transformed.hpp>
#include "common/visitor.hpp"
#include "consensus/yac/cluster_order.hpp"
#include "consensus/yac/messages.hpp"
Expand Down Expand Up @@ -119,8 +120,16 @@ namespace iroha {
PairValid{block, current_hash_.vote_round});
}
log_->info("Voted for another block, waiting for sync");
auto public_keys = boost::copy_range<
shared_model::interface::types::PublicKeyCollectionType>(
msg.votes | boost::adaptors::transformed([](auto &vote) {
return vote.signature->publicKey();
}));
auto model_hash = hash_provider_->toModelHash(hash);
return rxcpp::observable<>::just<GateObject>(
VoteOther{current_block_.value(), current_hash_.vote_round});
VoteOther{std::move(public_keys),
std::move(model_hash),
current_hash_.vote_round});
}

rxcpp::observable<YacGateImpl::GateObject> YacGateImpl::handleReject(
Expand Down
36 changes: 13 additions & 23 deletions irohad/synchronizer/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ namespace iroha {
log_->info("processing consensus outcome");
visit_in_place(
object,
[this](const consensus::PairValid &msg) {
this->processNext(msg.block, msg.round);
},
[this](const consensus::PairValid &msg) { this->processNext(msg); },
[this](const consensus::VoteOther &msg) {
this->processDifferent(msg.block, msg.round);
this->processDifferent(msg);
},
[this](const consensus::ProposalReject &msg) {
notifier_.get_subscriber().on_next(SynchronizationEvent{
Expand All @@ -63,17 +61,13 @@ namespace iroha {
}

SynchronizationEvent SynchronizerImpl::downloadMissingBlocks(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round,
const consensus::VoteOther &msg,
std::unique_ptr<ametsuchi::MutableStorage> storage) {
auto hash = commit_message->hash();

// while blocks are not loaded and not committed
while (true) {
// TODO andrei 17.10.18 IR-1763 Add delay strategy for loading blocks
for (const auto &peer_signature : commit_message->signatures()) {
auto network_chain = block_loader_->retrieveBlocks(
shared_model::crypto::PublicKey(peer_signature.publicKey()));
for (const auto &public_key : msg.public_keys) {
auto network_chain = block_loader_->retrieveBlocks(public_key);

std::vector<std::shared_ptr<shared_model::interface::Block>> blocks;
network_chain.as_blocking().subscribe(
Expand All @@ -86,11 +80,11 @@ namespace iroha {
auto chain =
rxcpp::observable<>::iterate(blocks, rxcpp::identity_immediate());

if (blocks.back()->hash() == hash
if (blocks.back()->hash() == msg.hash
and validator_->validateAndApply(chain, *storage)) {
mutable_factory_->commit(std::move(storage));

return {chain, SynchronizationOutcomeType::kCommit, round};
return {chain, SynchronizationOutcomeType::kCommit, msg.round};
}
}
}
Expand All @@ -111,27 +105,23 @@ namespace iroha {
->value)};
}

void SynchronizerImpl::processNext(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round) {
void SynchronizerImpl::processNext(const consensus::PairValid &msg) {
log_->info("at handleNext");
auto opt_storage = getStorage();
if (opt_storage == boost::none) {
return;
}
std::unique_ptr<ametsuchi::MutableStorage> storage =
std::move(opt_storage.value());
storage->apply(*commit_message);
storage->apply(*msg.block);
mutable_factory_->commit(std::move(storage));
notifier_.get_subscriber().on_next(
SynchronizationEvent{rxcpp::observable<>::just(commit_message),
SynchronizationEvent{rxcpp::observable<>::just(msg.block),
SynchronizationOutcomeType::kCommit,
round});
msg.round});
}

void SynchronizerImpl::processDifferent(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round) {
void SynchronizerImpl::processDifferent(const consensus::VoteOther &msg) {
log_->info("at handleDifferent");
auto opt_storage = getStorage();
if (opt_storage == boost::none) {
Expand All @@ -140,7 +130,7 @@ namespace iroha {
std::unique_ptr<ametsuchi::MutableStorage> storage =
std::move(opt_storage.value());
SynchronizationEvent result =
downloadMissingBlocks(commit_message, round, std::move(storage));
downloadMissingBlocks(msg, std::move(storage));
notifier_.get_subscriber().on_next(result);
}

Expand Down
14 changes: 5 additions & 9 deletions irohad/synchronizer/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
#ifndef IROHA_SYNCHRONIZER_IMPL_HPP
#define IROHA_SYNCHRONIZER_IMPL_HPP

#include "synchronizer/synchronizer.hpp"

#include "ametsuchi/mutable_factory.hpp"
#include "logger/logger.hpp"
#include "network/block_loader.hpp"
#include "network/consensus_gate.hpp"
#include "synchronizer/synchronizer.hpp"
#include "validation/chain_validator.hpp"

namespace iroha {
Expand All @@ -35,16 +36,11 @@ namespace iroha {
* apply the missing blocks
*/
SynchronizationEvent downloadMissingBlocks(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round,
const consensus::VoteOther &msg,
std::unique_ptr<ametsuchi::MutableStorage> storage);

void processNext(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round);
void processDifferent(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round);
void processNext(const consensus::PairValid &msg);
void processDifferent(const consensus::VoteOther &msg);

boost::optional<std::unique_ptr<ametsuchi::MutableStorage>> getStorage();

Expand Down
130 changes: 95 additions & 35 deletions test/module/irohad/consensus/yac/yac_gate_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

#include "consensus/yac/impl/yac_gate_impl.hpp"

#include <memory>

#include <rxcpp/rx.hpp>
#include "consensus/consensus_block_cache.hpp"
#include "consensus/yac/impl/yac_gate_impl.hpp"
#include "consensus/yac/storage/yac_proposal_storage.hpp"
#include "cryptography/crypto_provider/crypto_defaults.hpp"
#include "framework/specified_visitor.hpp"
Expand All @@ -22,7 +23,6 @@ using namespace iroha::network;
using namespace iroha::simulator;
using namespace framework::test_subscriber;
using namespace shared_model::crypto;
using namespace std;
using iroha::consensus::ConsensusResultCache;

using ::testing::_;
Expand Down Expand Up @@ -68,18 +68,21 @@ class YacGateTest : public ::testing::Test {
message.hash = expected_hash;
message.signature = signature;
commit_message = CommitMessage({message});
expected_commit = rxcpp::observable<>::just(Answer(commit_message));
expected_commit = commit_message;

hash_gate = make_unique<MockHashGate>();
peer_orderer = make_unique<MockYacPeerOrderer>();
hash_provider = make_shared<MockYacHashProvider>();
block_creator = make_shared<MockBlockCreator>();
block_cache = make_shared<ConsensusResultCache>();
}
auto hash_gate_ptr = std::make_unique<MockHashGate>();
hash_gate = hash_gate_ptr.get();
auto peer_orderer_ptr = std::make_unique<MockYacPeerOrderer>();
peer_orderer = peer_orderer_ptr.get();
hash_provider = std::make_shared<MockYacHashProvider>();
block_creator = std::make_shared<MockBlockCreator>();
block_cache = std::make_shared<ConsensusResultCache>();

ON_CALL(*block_creator, on_block())
.WillByDefault(Return(block_notifier.get_observable()));

void init() {
gate = std::make_shared<YacGateImpl>(std::move(hash_gate),
std::move(peer_orderer),
gate = std::make_shared<YacGateImpl>(std::move(hash_gate_ptr),
std::move(peer_orderer_ptr),
hash_provider,
block_creator,
block_cache);
Expand All @@ -92,15 +95,18 @@ class YacGateTest : public ::testing::Test {
std::shared_ptr<shared_model::interface::Block> expected_block;
VoteMessage message;
CommitMessage commit_message;
rxcpp::observable<Answer> expected_commit;
Answer expected_commit{commit_message};
rxcpp::subjects::subject<std::shared_ptr<shared_model::interface::Block>>
block_notifier;
rxcpp::subjects::subject<Answer> outcome_notifier;

unique_ptr<MockHashGate> hash_gate;
unique_ptr<MockYacPeerOrderer> peer_orderer;
shared_ptr<MockYacHashProvider> hash_provider;
shared_ptr<MockBlockCreator> block_creator;
shared_ptr<ConsensusResultCache> block_cache;
MockHashGate *hash_gate;
MockYacPeerOrderer *peer_orderer;
std::shared_ptr<MockYacHashProvider> hash_provider;
std::shared_ptr<MockBlockCreator> block_creator;
std::shared_ptr<ConsensusResultCache> block_cache;

shared_ptr<YacGateImpl> gate;
std::shared_ptr<YacGateImpl> gate;

protected:
YacGateTest() : commit_message(std::vector<VoteMessage>{}) {}
Expand All @@ -115,7 +121,8 @@ TEST_F(YacGateTest, YacGateSubscriptionTest) {
// yac consensus
EXPECT_CALL(*hash_gate, vote(expected_hash, _)).Times(1);

EXPECT_CALL(*hash_gate, onOutcome()).WillOnce(Return(expected_commit));
EXPECT_CALL(*hash_gate, onOutcome())
.WillOnce(Return(outcome_notifier.get_observable()));

// generate order of peers
EXPECT_CALL(*peer_orderer, getOrdering(_))
Expand All @@ -124,11 +131,7 @@ TEST_F(YacGateTest, YacGateSubscriptionTest) {
// make hash from block
EXPECT_CALL(*hash_provider, makeHash(_)).WillOnce(Return(expected_hash));

// make blocks
EXPECT_CALL(*block_creator, on_block())
.WillOnce(Return(rxcpp::observable<>::just(expected_block)));

init();
block_notifier.get_subscriber().on_next(expected_block);

// verify that block we voted for is in the cache
auto cache_block = block_cache->get();
Expand All @@ -145,6 +148,8 @@ TEST_F(YacGateTest, YacGateSubscriptionTest) {
ASSERT_EQ(block, cache_block);
});

outcome_notifier.get_subscriber().on_next(expected_commit);

ASSERT_TRUE(gate_wrapper.validate());
}

Expand All @@ -165,11 +170,7 @@ TEST_F(YacGateTest, YacGateSubscribtionTestFailCase) {
// make hash from block
EXPECT_CALL(*hash_provider, makeHash(_)).WillOnce(Return(expected_hash));

// make blocks
EXPECT_CALL(*block_creator, on_block())
.WillOnce(Return(rxcpp::observable<>::just(expected_block)));

init();
block_notifier.get_subscriber().on_next(expected_block);
}

/**
Expand All @@ -179,15 +180,74 @@ TEST_F(YacGateTest, YacGateSubscribtionTestFailCase) {
*/
TEST_F(YacGateTest, AgreementOnNone) {
EXPECT_CALL(*hash_gate, vote(_, _)).Times(1);
EXPECT_CALL(*block_creator, on_block())
.WillOnce(Return(rxcpp::observable<>::empty<
std::shared_ptr<shared_model::interface::Block>>()));

EXPECT_CALL(*peer_orderer, getOrdering(_))
.WillOnce(Return(ClusterOrdering::create({mk_peer("fake_node")})));

init();

ASSERT_EQ(block_cache->get(), nullptr);

gate->vote(boost::none, boost::none, {});

ASSERT_EQ(block_cache->get(), nullptr);
}

/**
* @given yac gate
* @when voting for one block @and receiving another
* @then yac gate will emit the data of block, for which consensus voted
*/
TEST_F(YacGateTest, DifferentCommit) {
// make hash from block
EXPECT_CALL(*hash_provider, makeHash(_)).WillOnce(Return(expected_hash));

// generate order of peers
EXPECT_CALL(*peer_orderer, getOrdering(_))
.WillOnce(Return(ClusterOrdering::create({mk_peer("fake_node")})));

EXPECT_CALL(*hash_gate, vote(expected_hash, _)).Times(1);

block_notifier.get_subscriber().on_next(expected_block);

// create another block, which will be "received", and generate a commit
// message with it
decltype(expected_block) actual_block = std::make_shared<MockBlock>();
Hash actual_hash("actual_hash");
PublicKey actual_pubkey("actual_pubkey");
auto signature = std::make_shared<MockSignature>();
EXPECT_CALL(*signature, publicKey())
.WillRepeatedly(ReturnRefOfCopy(actual_pubkey));

message.hash =
YacHash(iroha::consensus::Round{1, 1}, "actual_proposal", "actual_block");
message.signature = signature;
commit_message = CommitMessage({message});
expected_commit = commit_message;

// yac
EXPECT_CALL(*hash_gate, onOutcome())
.WillOnce(Return(outcome_notifier.get_observable()));

// convert yac hash to model hash
EXPECT_CALL(*hash_provider, toModelHash(message.hash))
.WillOnce(Return(actual_hash));

// verify that block we voted for is in the cache
auto cache_block = block_cache->get();
ASSERT_EQ(cache_block, expected_block);

// verify that yac gate emit expected block
auto gate_wrapper = make_test_subscriber<CallExact>(gate->onOutcome(), 1);
gate_wrapper.subscribe([actual_hash, actual_pubkey](auto outcome) {
auto concrete_outcome = boost::get<iroha::consensus::VoteOther>(outcome);
auto public_keys = concrete_outcome.public_keys;
auto hash = concrete_outcome.hash;

ASSERT_EQ(1, public_keys.size());
ASSERT_EQ(actual_pubkey, public_keys.front());
ASSERT_EQ(hash, actual_hash);
});

outcome_notifier.get_subscriber().on_next(expected_commit);

ASSERT_TRUE(gate_wrapper.validate());
}
Loading