Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Improve and make delay func pluggable to ordering gate init #1997

Merged
merged 2 commits into from
Dec 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,31 @@ void Irohad::initOrderingGate() {
auto factory = std::make_unique<shared_model::proto::ProtoProposalFactory<
shared_model::validation::DefaultProposalValidator>>();

const uint64_t kCounter = 0, kMaxLocalCounter = 2;
// reject_counter and local_counter are local mutable variables of lambda
const uint64_t kMaxDelaySeconds = 5;
auto delay = [reject_counter = kCounter,
local_counter = kCounter,
// MSVC requires const variables to be captured
kMaxDelaySeconds,
kMaxLocalCounter](const auto &commit) mutable {
using iroha::synchronizer::SynchronizationOutcomeType;
if (commit.sync_outcome == SynchronizationOutcomeType::kReject
or commit.sync_outcome == SynchronizationOutcomeType::kNothing) {
// Increment reject_counter each local_counter calls of function
++local_counter;
if (local_counter == kMaxLocalCounter) {
local_counter = 0;
if (reject_counter < kMaxDelaySeconds) {
reject_counter++;
}
}
} else {
reject_counter = 0;
}
return std::chrono::seconds(reject_counter);
};

ordering_gate = ordering_init.initOrderingGate(max_proposal_size_,
proposal_delay_,
std::move(hashes),
Expand All @@ -277,7 +302,8 @@ void Irohad::initOrderingGate() {
async_call_,
std::move(factory),
persistent_cache,
{blocks.back()->height(), 1});
{blocks.back()->height(), 1},
delay);
log_->info("[Init] => init ordering gate - [{}]",
logger::logBool(ordering_gate));
}
Expand Down
41 changes: 9 additions & 32 deletions irohad/main/impl/on_demand_ordering_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,35 +184,9 @@ namespace iroha {
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
consensus::Round initial_round) {
// TODO andrei 06.12.18 IR-75 Make counter and generator parametrizable
const uint64_t kCounter = 0, kMaxLocalCounter = 2;
auto time_generator = [](auto reject_counter) {
return std::chrono::seconds(reject_counter);
};
// reject_counter and local_counter are local mutable variables of lambda
auto delay = [reject_counter = kCounter,
local_counter = kCounter,
&time_generator,
// MSVC requires const variables to be captured
kMaxLocalCounter](const auto &commit) mutable {
using iroha::synchronizer::SynchronizationOutcomeType;
if (commit.sync_outcome == SynchronizationOutcomeType::kReject
or commit.sync_outcome == SynchronizationOutcomeType::kNothing) {
// Increment reject_counter each local_counter calls of function
++local_counter;
if (local_counter == kMaxLocalCounter) {
local_counter = 0;
if (reject_counter
< std::numeric_limits<decltype(reject_counter)>::max()) {
reject_counter++;
}
}
} else {
reject_counter = 0;
}
return time_generator(reject_counter);
};
consensus::Round initial_round,
std::function<std::chrono::seconds(
const synchronizer::SynchronizationEvent &)> delay_func) {

auto map = [](auto commit) {
return matchEvent(
Expand Down Expand Up @@ -251,7 +225,7 @@ namespace iroha {
notifier.get_observable()
.lift<iroha::synchronizer::SynchronizationEvent>(
iroha::makeDelay<iroha::synchronizer::SynchronizationEvent>(
delay, rxcpp::identity_current_thread()))
delay_func, rxcpp::identity_current_thread()))
.map(map),
std::move(cache),
std::move(proposal_factory),
Expand Down Expand Up @@ -290,7 +264,9 @@ namespace iroha {
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
consensus::Round initial_round) {
consensus::Round initial_round,
std::function<std::chrono::seconds(
const synchronizer::SynchronizationEvent &)> delay_func) {
auto ordering_service =
createService(max_size, proposal_factory, tx_cache);
service = std::make_shared<ordering::transport::OnDemandOsServerGrpc>(
Expand All @@ -306,7 +282,8 @@ namespace iroha {
std::make_shared<ordering::cache::OnDemandCache>(),
std::move(proposal_factory),
std::move(tx_cache),
initial_round);
initial_round,
std::move(delay_func));
}

} // namespace network
Expand Down
8 changes: 6 additions & 2 deletions irohad/main/impl/on_demand_ordering_init.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ namespace iroha {
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
consensus::Round initial_round);
consensus::Round initial_round,
std::function<std::chrono::seconds(
const synchronizer::SynchronizationEvent &)> delay_func);

/**
* Creates on-demand ordering service. \see initOrderingGate for
Expand Down Expand Up @@ -116,7 +118,9 @@ namespace iroha {
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
consensus::Round initial_round);
consensus::Round initial_round,
std::function<std::chrono::seconds(
const synchronizer::SynchronizationEvent &)> delay_func);

/// gRPC service for ordering service
std::shared_ptr<ordering::proto::OnDemandOrdering::Service> service;
Expand Down