Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Fix consensus VoteOther case #1834

Merged
merged 6 commits into from
Nov 9, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions irohad/consensus/gate_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
#define CONSENSUS_GATE_OBJECT_HPP

#include <boost/variant.hpp>

#include "consensus/round.hpp"
#include "cryptography/hash.hpp"
#include "cryptography/public_key.hpp"
#include "interfaces/common_objects/types.hpp"

namespace shared_model {
namespace interface {
Expand All @@ -27,7 +29,8 @@ namespace iroha {

/// Network votes for another pair and round
struct VoteOther {
std::shared_ptr<shared_model::interface::Block> block;
shared_model::interface::types::PublicKeyCollectionType public_keys;
shared_model::interface::types::HashType hash;
consensus::Round round;
};

Expand Down
13 changes: 12 additions & 1 deletion irohad/consensus/yac/impl/yac_gate_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,19 @@ namespace iroha {
PairValid{block, current_hash_.vote_round});
}
log_->info("Voted for another block, waiting for sync");
auto public_keys = std::accumulate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use transform instead?

msg.votes.begin(),
msg.votes.end(),
shared_model::interface::types::PublicKeyCollectionType{},
[](auto &acc, const auto &vote) {
acc.push_back(vote.signature->publicKey());
return acc;
});
auto model_hash = hash_provider_->toModelHash(hash);
return rxcpp::observable<>::just<GateObject>(
VoteOther{current_block_.value(), current_hash_.vote_round});
VoteOther{std::move(public_keys),
std::move(model_hash),
current_hash_.vote_round});
}

rxcpp::observable<YacGateImpl::GateObject> YacGateImpl::handleReject(
Expand Down
19 changes: 10 additions & 9 deletions irohad/synchronizer/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace iroha {
this->processNext(msg.block, msg.round);
},
[this](const consensus::VoteOther &msg) {
this->processDifferent(msg.block, msg.round);
this->processDifferent(msg.public_keys, msg.hash, msg.round);
},
[this](const consensus::ProposalReject &msg) {
notifier_.get_subscriber().on_next(SynchronizationEvent{
Expand All @@ -63,17 +63,16 @@ namespace iroha {
}

SynchronizationEvent SynchronizerImpl::downloadMissingBlocks(
std::shared_ptr<shared_model::interface::Block> commit_message,
const shared_model::interface::types::PublicKeyCollectionType
&public_keys,
const shared_model::interface::types::HashType &hash,
const consensus::Round &round,
std::unique_ptr<ametsuchi::MutableStorage> storage) {
auto hash = commit_message->hash();

// while blocks are not loaded and not committed
while (true) {
// TODO andrei 17.10.18 IR-1763 Add delay strategy for loading blocks
for (const auto &peer_signature : commit_message->signatures()) {
auto network_chain = block_loader_->retrieveBlocks(
shared_model::crypto::PublicKey(peer_signature.publicKey()));
for (const auto &public_key : public_keys) {
auto network_chain = block_loader_->retrieveBlocks(public_key);

std::vector<std::shared_ptr<shared_model::interface::Block>> blocks;
network_chain.as_blocking().subscribe(
Expand Down Expand Up @@ -130,7 +129,9 @@ namespace iroha {
}

void SynchronizerImpl::processDifferent(
std::shared_ptr<shared_model::interface::Block> commit_message,
const shared_model::interface::types::PublicKeyCollectionType
&public_keys,
const shared_model::interface::types::HashType &hash,
const consensus::Round &round) {
log_->info("at handleDifferent");
auto opt_storage = getStorage();
Expand All @@ -140,7 +141,7 @@ namespace iroha {
std::unique_ptr<ametsuchi::MutableStorage> storage =
std::move(opt_storage.value());
SynchronizationEvent result =
downloadMissingBlocks(commit_message, round, std::move(storage));
downloadMissingBlocks(public_keys, hash, round, std::move(storage));
notifier_.get_subscriber().on_next(result);
}

Expand Down
11 changes: 8 additions & 3 deletions irohad/synchronizer/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
#ifndef IROHA_SYNCHRONIZER_IMPL_HPP
#define IROHA_SYNCHRONIZER_IMPL_HPP

#include "synchronizer/synchronizer.hpp"

#include "ametsuchi/mutable_factory.hpp"
#include "logger/logger.hpp"
#include "network/block_loader.hpp"
#include "network/consensus_gate.hpp"
#include "synchronizer/synchronizer.hpp"
#include "validation/chain_validator.hpp"

namespace iroha {
Expand All @@ -35,15 +36,19 @@ namespace iroha {
* apply the missing blocks
*/
SynchronizationEvent downloadMissingBlocks(
std::shared_ptr<shared_model::interface::Block> commit_message,
const shared_model::interface::types::PublicKeyCollectionType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe pass VoteOther to avoid specifying all parameters in functions?

&public_keys,
const shared_model::interface::types::HashType &hash,
const consensus::Round &round,
std::unique_ptr<ametsuchi::MutableStorage> storage);

void processNext(
std::shared_ptr<shared_model::interface::Block> commit_message,
const consensus::Round &round);
void processDifferent(
std::shared_ptr<shared_model::interface::Block> commit_message,
const shared_model::interface::types::PublicKeyCollectionType
&public_keys,
const shared_model::interface::types::HashType &hash,
const consensus::Round &round);

boost::optional<std::unique_ptr<ametsuchi::MutableStorage>> getStorage();
Expand Down
115 changes: 88 additions & 27 deletions test/module/irohad/consensus/yac/yac_gate_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

#include "consensus/yac/impl/yac_gate_impl.hpp"

#include <memory>

#include <rxcpp/rx.hpp>
#include "consensus/consensus_block_cache.hpp"
#include "consensus/yac/impl/yac_gate_impl.hpp"
#include "consensus/yac/storage/yac_proposal_storage.hpp"
#include "cryptography/crypto_provider/crypto_defaults.hpp"
#include "framework/specified_visitor.hpp"
Expand Down Expand Up @@ -68,18 +69,21 @@ class YacGateTest : public ::testing::Test {
message.hash = expected_hash;
message.signature = signature;
commit_message = CommitMessage({message});
expected_commit = rxcpp::observable<>::just(Answer(commit_message));
expected_commit = commit_message;

hash_gate = make_unique<MockHashGate>();
peer_orderer = make_unique<MockYacPeerOrderer>();
auto hash_gate_ptr = make_unique<MockHashGate>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using namespace std
😱 (not a pr issue)

hash_gate = hash_gate_ptr.get();
auto peer_orderer_ptr = make_unique<MockYacPeerOrderer>();
peer_orderer = peer_orderer_ptr.get();
hash_provider = make_shared<MockYacHashProvider>();
block_creator = make_shared<MockBlockCreator>();
block_cache = make_shared<ConsensusResultCache>();
}

void init() {
gate = std::make_shared<YacGateImpl>(std::move(hash_gate),
std::move(peer_orderer),
EXPECT_CALL(*block_creator, on_block())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use ON_CALL so that we do not set any unnecessary invariants for tests?

.WillOnce(Return(block_notifier.get_observable()));

gate = std::make_shared<YacGateImpl>(std::move(hash_gate_ptr),
std::move(peer_orderer_ptr),
hash_provider,
block_creator,
block_cache);
Expand All @@ -92,10 +96,13 @@ class YacGateTest : public ::testing::Test {
std::shared_ptr<shared_model::interface::Block> expected_block;
VoteMessage message;
CommitMessage commit_message;
rxcpp::observable<Answer> expected_commit;
Answer expected_commit{commit_message};
rxcpp::subjects::subject<std::shared_ptr<shared_model::interface::Block>>
block_notifier;
rxcpp::subjects::subject<Answer> outcome_notifier;

unique_ptr<MockHashGate> hash_gate;
unique_ptr<MockYacPeerOrderer> peer_orderer;
MockHashGate *hash_gate;
MockYacPeerOrderer *peer_orderer;
shared_ptr<MockYacHashProvider> hash_provider;
shared_ptr<MockBlockCreator> block_creator;
shared_ptr<ConsensusResultCache> block_cache;
Expand All @@ -115,7 +122,8 @@ TEST_F(YacGateTest, YacGateSubscriptionTest) {
// yac consensus
EXPECT_CALL(*hash_gate, vote(expected_hash, _)).Times(1);

EXPECT_CALL(*hash_gate, onOutcome()).WillOnce(Return(expected_commit));
EXPECT_CALL(*hash_gate, onOutcome())
.WillOnce(Return(outcome_notifier.get_observable()));

// generate order of peers
EXPECT_CALL(*peer_orderer, getOrdering(_))
Expand All @@ -124,11 +132,7 @@ TEST_F(YacGateTest, YacGateSubscriptionTest) {
// make hash from block
EXPECT_CALL(*hash_provider, makeHash(_)).WillOnce(Return(expected_hash));

// make blocks
EXPECT_CALL(*block_creator, on_block())
.WillOnce(Return(rxcpp::observable<>::just(expected_block)));

init();
block_notifier.get_subscriber().on_next(expected_block);

// verify that block we voted for is in the cache
auto cache_block = block_cache->get();
Expand All @@ -145,6 +149,8 @@ TEST_F(YacGateTest, YacGateSubscriptionTest) {
ASSERT_EQ(block, cache_block);
});

outcome_notifier.get_subscriber().on_next(expected_commit);

ASSERT_TRUE(gate_wrapper.validate());
}

Expand All @@ -165,11 +171,7 @@ TEST_F(YacGateTest, YacGateSubscribtionTestFailCase) {
// make hash from block
EXPECT_CALL(*hash_provider, makeHash(_)).WillOnce(Return(expected_hash));

// make blocks
EXPECT_CALL(*block_creator, on_block())
.WillOnce(Return(rxcpp::observable<>::just(expected_block)));

init();
block_notifier.get_subscriber().on_next(expected_block);
}

/**
Expand All @@ -179,15 +181,74 @@ TEST_F(YacGateTest, YacGateSubscribtionTestFailCase) {
*/
TEST_F(YacGateTest, AgreementOnNone) {
EXPECT_CALL(*hash_gate, vote(_, _)).Times(1);
EXPECT_CALL(*block_creator, on_block())
.WillOnce(Return(rxcpp::observable<>::empty<
std::shared_ptr<shared_model::interface::Block>>()));

EXPECT_CALL(*peer_orderer, getOrdering(_))
.WillOnce(Return(ClusterOrdering::create({mk_peer("fake_node")})));

init();

ASSERT_EQ(block_cache->get(), nullptr);

gate->vote(boost::none, boost::none, {});

ASSERT_EQ(block_cache->get(), nullptr);
}

/**
* @given yac gate
* @when voting for one block @and receiving another
* @then yac gate will emit the data of block, for which consensus voted
*/
TEST_F(YacGateTest, DifferentCommit) {
// make hash from block
EXPECT_CALL(*hash_provider, makeHash(_)).WillOnce(Return(expected_hash));

// generate order of peers
EXPECT_CALL(*peer_orderer, getOrdering(_))
.WillOnce(Return(ClusterOrdering::create({mk_peer("fake_node")})));

EXPECT_CALL(*hash_gate, vote(expected_hash, _)).Times(1);

block_notifier.get_subscriber().on_next(expected_block);

// create another block, which will be "received", and generate a commit
// message with it
decltype(expected_block) actual_block = std::make_shared<MockBlock>();
Hash actual_hash("actual_hash");
PublicKey actual_pubkey("actual_pubkey");
auto signature = std::make_shared<MockSignature>();
EXPECT_CALL(*signature, publicKey())
.WillRepeatedly(ReturnRefOfCopy(actual_pubkey));

message.hash =
YacHash(iroha::consensus::Round{1, 1}, "actual_proposal", "actual_block");
message.signature = signature;
commit_message = CommitMessage({message});
expected_commit = commit_message;

// yac
EXPECT_CALL(*hash_gate, onOutcome())
.WillOnce(Return(outcome_notifier.get_observable()));

// convert yac hash to model hash
EXPECT_CALL(*hash_provider, toModelHash(message.hash))
.WillOnce(Return(actual_hash));

// verify that block we voted for is in the cache
auto cache_block = block_cache->get();
ASSERT_EQ(cache_block, expected_block);

// verify that yac gate emit expected block
auto gate_wrapper = make_test_subscriber<CallExact>(gate->onOutcome(), 1);
gate_wrapper.subscribe([actual_hash, actual_pubkey](auto outcome) {
auto concete_outcome = boost::get<iroha::consensus::VoteOther>(outcome);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in variable name?

auto public_keys = concete_outcome.public_keys;
auto hash = concete_outcome.hash;

ASSERT_EQ(1, public_keys.size());
ASSERT_EQ(actual_pubkey, public_keys.front());
ASSERT_EQ(hash, actual_hash);
});

outcome_notifier.get_subscriber().on_next(expected_commit);

ASSERT_TRUE(gate_wrapper.validate());
}
Loading