Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Improve and make delay func pluggable to ordering gate init (#1997)
Browse files Browse the repository at this point in the history
Signed-off-by: Igor Egorov <[email protected]>
  • Loading branch information
Igor Egorov authored Dec 28, 2018
1 parent 14f8334 commit 0e645f4
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 35 deletions.
28 changes: 27 additions & 1 deletion irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,31 @@ void Irohad::initOrderingGate() {
auto factory = std::make_unique<shared_model::proto::ProtoProposalFactory<
shared_model::validation::DefaultProposalValidator>>();

const uint64_t kCounter = 0, kMaxLocalCounter = 2;
// reject_counter and local_counter are local mutable variables of lambda
const uint64_t kMaxDelaySeconds = 5;
auto delay = [reject_counter = kCounter,
local_counter = kCounter,
// MSVC requires const variables to be captured
kMaxDelaySeconds,
kMaxLocalCounter](const auto &commit) mutable {
using iroha::synchronizer::SynchronizationOutcomeType;
if (commit.sync_outcome == SynchronizationOutcomeType::kReject
or commit.sync_outcome == SynchronizationOutcomeType::kNothing) {
// Increment reject_counter each local_counter calls of function
++local_counter;
if (local_counter == kMaxLocalCounter) {
local_counter = 0;
if (reject_counter < kMaxDelaySeconds) {
reject_counter++;
}
}
} else {
reject_counter = 0;
}
return std::chrono::seconds(reject_counter);
};

ordering_gate = ordering_init.initOrderingGate(max_proposal_size_,
proposal_delay_,
std::move(hashes),
Expand All @@ -277,7 +302,8 @@ void Irohad::initOrderingGate() {
async_call_,
std::move(factory),
persistent_cache,
{blocks.back()->height(), 1});
{blocks.back()->height(), 1},
delay);
log_->info("[Init] => init ordering gate - [{}]",
logger::logBool(ordering_gate));
}
Expand Down
41 changes: 9 additions & 32 deletions irohad/main/impl/on_demand_ordering_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,35 +184,9 @@ namespace iroha {
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
consensus::Round initial_round) {
// TODO andrei 06.12.18 IR-75 Make counter and generator parametrizable
const uint64_t kCounter = 0, kMaxLocalCounter = 2;
auto time_generator = [](auto reject_counter) {
return std::chrono::seconds(reject_counter);
};
// reject_counter and local_counter are local mutable variables of lambda
auto delay = [reject_counter = kCounter,
local_counter = kCounter,
&time_generator,
// MSVC requires const variables to be captured
kMaxLocalCounter](const auto &commit) mutable {
using iroha::synchronizer::SynchronizationOutcomeType;
if (commit.sync_outcome == SynchronizationOutcomeType::kReject
or commit.sync_outcome == SynchronizationOutcomeType::kNothing) {
// Increment reject_counter each local_counter calls of function
++local_counter;
if (local_counter == kMaxLocalCounter) {
local_counter = 0;
if (reject_counter
< std::numeric_limits<decltype(reject_counter)>::max()) {
reject_counter++;
}
}
} else {
reject_counter = 0;
}
return time_generator(reject_counter);
};
consensus::Round initial_round,
std::function<std::chrono::seconds(
const synchronizer::SynchronizationEvent &)> delay_func) {

auto map = [](auto commit) {
return matchEvent(
Expand Down Expand Up @@ -251,7 +225,7 @@ namespace iroha {
notifier.get_observable()
.lift<iroha::synchronizer::SynchronizationEvent>(
iroha::makeDelay<iroha::synchronizer::SynchronizationEvent>(
delay, rxcpp::identity_current_thread()))
delay_func, rxcpp::identity_current_thread()))
.map(map),
std::move(cache),
std::move(proposal_factory),
Expand Down Expand Up @@ -290,7 +264,9 @@ namespace iroha {
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
consensus::Round initial_round) {
consensus::Round initial_round,
std::function<std::chrono::seconds(
const synchronizer::SynchronizationEvent &)> delay_func) {
auto ordering_service =
createService(max_size, proposal_factory, tx_cache);
service = std::make_shared<ordering::transport::OnDemandOsServerGrpc>(
Expand All @@ -306,7 +282,8 @@ namespace iroha {
std::make_shared<ordering::cache::OnDemandCache>(),
std::move(proposal_factory),
std::move(tx_cache),
initial_round);
initial_round,
std::move(delay_func));
}

} // namespace network
Expand Down
8 changes: 6 additions & 2 deletions irohad/main/impl/on_demand_ordering_init.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ namespace iroha {
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
consensus::Round initial_round);
consensus::Round initial_round,
std::function<std::chrono::seconds(
const synchronizer::SynchronizationEvent &)> delay_func);

/**
* Creates on-demand ordering service. \see initOrderingGate for
Expand Down Expand Up @@ -116,7 +118,9 @@ namespace iroha {
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
consensus::Round initial_round);
consensus::Round initial_round,
std::function<std::chrono::seconds(
const synchronizer::SynchronizationEvent &)> delay_func);

/// gRPC service for ordering service
std::shared_ptr<ordering::proto::OnDemandOrdering::Service> service;
Expand Down

0 comments on commit 0e645f4

Please sign in to comment.