Skip to content

Commit

Permalink
[RDB] block store
Browse files Browse the repository at this point in the history
/build before-merge

Signed-off-by: iceseer <[email protected]>
  • Loading branch information
iceseer committed Aug 21, 2021
1 parent 5f731a2 commit 6dd0a32
Show file tree
Hide file tree
Showing 11 changed files with 529 additions and 15 deletions.
1 change: 0 additions & 1 deletion example/config.sample
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{
"block_store_path" : "/tmp/block_store/",
"torii_port" : 50051,
"internal_port" : 10001,
"database": {
Expand Down
14 changes: 14 additions & 0 deletions irohad/ametsuchi/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ target_link_libraries(pool_wrapper
SOCI::core
)

add_library(rocksdb_block_storage
impl/rocksdb_block_storage.cpp
impl/rocksdb_block_storage_factory.cpp
)

target_link_libraries(rocksdb_block_storage
libs_files
shared_model_proto_backend
logger
Boost::boost
RocksDB::rocksdb
)

add_library(flat_file_storage
impl/flat_file/flat_file.cpp
impl/flat_file_block_storage.cpp
Expand Down Expand Up @@ -201,6 +214,7 @@ target_link_libraries(ametsuchi
rdb_connection_init
ametsuchi_rocksdb
flat_file_storage
rocksdb_block_storage
postgres_indexer
postgres_storage
logger
Expand Down
135 changes: 135 additions & 0 deletions irohad/ametsuchi/impl/rocksdb_block_storage.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "ametsuchi/impl/rocksdb_block_storage.hpp"

#include "ametsuchi/impl/rocksdb_common.hpp"
#include "backend/protobuf/block.hpp"
#include "common/byteutils.hpp"
#include "logger/logger.hpp"

using namespace iroha::ametsuchi;

#define CHECK_OPERATION(command, ...) \
if (auto result = (__VA_ARGS__); expected::hasError(result)) { \
log_->error("Error while block {} " command ". Code: {}. Description: {}", \
block->height(), \
result.assumeError().code, \
result.assumeError().description); \
return false; \
}

namespace {
inline iroha::expected::Result<void, DbError> incrementTotalBlocksCount(
iroha::ametsuchi::RocksDbCommon &common) {
RDB_TRY_GET_VALUE(
opt_count,
forBlocksTotalCount<kDbOperation::kGet, kDbEntry::kCanExist>(common));

common.encode(opt_count ? *opt_count + 1ull : 1ull);
RDB_ERROR_CHECK(
forBlocksTotalCount<kDbOperation::kPut, kDbEntry::kMustExist>(common));

return {};
}
} // namespace

RocksDbBlockStorage::RocksDbBlockStorage(
std::shared_ptr<RocksDBContext> db_context,
std::shared_ptr<shared_model::interface::BlockJsonConverter> json_converter,
logger::LoggerPtr log)
: db_context_(std::move(db_context)),
json_converter_(std::move(json_converter)),
log_(std::move(log)) {}

bool RocksDbBlockStorage::insert(
std::shared_ptr<const shared_model::interface::Block> block) {
return json_converter_->serialize(*block).match(
[&](const auto &block_json) {
RocksDbCommon common(db_context_);
CHECK_OPERATION("insertion",
forBlock<kDbOperation::kCheck, kDbEntry::kMustNotExist>(
common, block->height()));

common.valueBuffer() = block_json.value;
CHECK_OPERATION("storing",
forBlock<kDbOperation::kPut>(common, block->height()));

CHECK_OPERATION("total count storing",
incrementTotalBlocksCount(common));
return true;
},
[this](const auto &error) {
log_->warn("Error while block serialization: {}", error.error);
return false;
});
}

boost::optional<std::unique_ptr<shared_model::interface::Block>>
RocksDbBlockStorage::fetch(
shared_model::interface::types::HeightType height) const {
RocksDbCommon common(db_context_);
if (auto result =
forBlock<kDbOperation::kGet, kDbEntry::kMustExist>(common, height);
expected::hasError(result)) {
log_->error("Error while block {} reading. Code: {}. Description: {}",
height,
result.assumeError().code,
result.assumeError().description);
return boost::none;
}

return json_converter_->deserialize(common.valueBuffer())
.match(
[&](auto &&block) {
return boost::make_optional<
std::unique_ptr<shared_model::interface::Block>>(
std::move(block.value));
},
[&](const auto &error)
-> boost::optional<
std::unique_ptr<shared_model::interface::Block>> {
log_->warn("Error while block deserialization: {}", error.error);
return boost::none;
});
}

size_t RocksDbBlockStorage::size() const {
RocksDbCommon common(db_context_);
if (auto result =
forBlocksTotalCount<kDbOperation::kGet, kDbEntry::kMustExist>(common);
expected::hasValue(result))
return *result.assumeValue();
return 0ull;
}

void RocksDbBlockStorage::reload() {}

void RocksDbBlockStorage::clear() {
RocksDbCommon common(db_context_);

if (auto status = common.filterDelete(fmtstrings::kPathWsv); !status.ok())
log_->error("Unable to delete WSV. Description: {}", status.ToString());

if (auto status = common.filterDelete(fmtstrings::kPathStore); !status.ok())
log_->error("Unable to delete STORE. Description: {}", status.ToString());
}

iroha::expected::Result<void, std::string> RocksDbBlockStorage::forEach(
iroha::ametsuchi::BlockStorage::FunctionType function) const {
uint64_t const blocks_count = size();
for (uint64_t ix = 1; ix <= blocks_count; ++ix) {
auto maybe_block = fetch(ix);
if (maybe_block) {
auto maybe_error = function(std::move(maybe_block).value());
if (iroha::expected::hasError(maybe_error)) {
return maybe_error.assumeError();
}
} else {
return fmt::format("Failed to fetch block {}", ix);
}
}
return {};
}
49 changes: 49 additions & 0 deletions irohad/ametsuchi/impl/rocksdb_block_storage.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_ROCKSDB_BLOCK_STORAGE_HPP
#define IROHA_ROCKSDB_BLOCK_STORAGE_HPP

#include "ametsuchi/block_storage.hpp"

#include "interfaces/iroha_internal/block_json_converter.hpp"
#include "logger/logger_fwd.hpp"

namespace iroha::ametsuchi {
struct RocksDBContext;

class RocksDbBlockStorage : public BlockStorage {
public:
RocksDbBlockStorage(
std::shared_ptr<RocksDBContext> db_context,
std::shared_ptr<shared_model::interface::BlockJsonConverter>
json_converter,
logger::LoggerPtr log);

bool insert(
std::shared_ptr<const shared_model::interface::Block> block) override;

boost::optional<std::unique_ptr<shared_model::interface::Block>> fetch(
shared_model::interface::types::HeightType height) const override;

size_t size() const override;

void reload() override;

void clear() override;

expected::Result<void, std::string> forEach(
FunctionType function) const override;

private:
std::shared_ptr<RocksDBContext> db_context_;
std::shared_ptr<shared_model::interface::BlockJsonConverter>
json_converter_;
logger::LoggerPtr log_;
};

} // namespace iroha::ametsuchi

#endif // IROHA_ROCKSDB_BLOCK_STORAGE_HPP
28 changes: 28 additions & 0 deletions irohad/ametsuchi/impl/rocksdb_block_storage_factory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "ametsuchi/impl/rocksdb_block_storage_factory.hpp"

#include "ametsuchi/impl/rocksdb_block_storage.hpp"
#include "ametsuchi/impl/rocksdb_common.hpp"

using namespace iroha::ametsuchi;

RocksDbBlockStorageFactory::RocksDbBlockStorageFactory(
std::shared_ptr<RocksDBContext> db_context,
std::shared_ptr<shared_model::interface::BlockJsonConverter>
json_block_converter,
logger::LoggerManagerTreePtr log_manager)
: db_context_(std::move(db_context)),
json_block_converter_(std::move(json_block_converter)),
log_manager_(std::move(log_manager)) {}

iroha::expected::Result<std::unique_ptr<BlockStorage>, std::string>
RocksDbBlockStorageFactory::create() {
return std::make_unique<RocksDbBlockStorage>(
db_context_,
json_block_converter_,
log_manager_->getChild("RocksDbBlockFactory")->getLogger());
}
37 changes: 37 additions & 0 deletions irohad/ametsuchi/impl/rocksdb_block_storage_factory.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_ROCKSDB_BLOCK_STORAGE_FACTORY_HPP
#define IROHA_ROCKSDB_BLOCK_STORAGE_FACTORY_HPP

#include "ametsuchi/block_storage_factory.hpp"

#include "interfaces/iroha_internal/block_json_converter.hpp"
#include "logger/logger_manager.hpp"

namespace iroha::ametsuchi {
struct RocksDBContext;

class RocksDbBlockStorageFactory : public BlockStorageFactory {
public:
RocksDbBlockStorageFactory(
std::shared_ptr<RocksDBContext> db_context,
std::shared_ptr<shared_model::interface::BlockJsonConverter>
json_block_converter,
logger::LoggerManagerTreePtr log_manager);

iroha::expected::Result<std::unique_ptr<BlockStorage>, std::string> create()
override;

private:
std::shared_ptr<RocksDBContext> db_context_;
std::shared_ptr<shared_model::interface::BlockJsonConverter>
json_block_converter_;
logger::LoggerManagerTreePtr log_manager_;
};

} // namespace iroha::ametsuchi

#endif // IROHA_ROCKSDB_BLOCK_STORAGE_FACTORY_HPP
3 changes: 0 additions & 3 deletions irohad/ametsuchi/impl/rocksdb_command_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,10 +752,7 @@ RocksDbCommandExecutor::ExecutionResult RocksDbCommandExecutor::operator()(
bool do_validation,
shared_model::interface::RolePermissionSet const &creator_permissions) {
auto const &[account_name, domain_id] = staticSplitId<2>(command.accountId());

auto const revoked_perm = command.permissionName();
auto const required_perm =
shared_model::interface::permissions::permissionFor(revoked_perm);

if (do_validation) {
// check if account exists
Expand Down
42 changes: 41 additions & 1 deletion irohad/ametsuchi/impl/rocksdb_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* | +-<height_2, value:block>
* | +-<height_3, value:block>
* | +-<version>
* | +-<blocks_total_count, value>
* |
* +-|WSV|-+-|NETWORK|-+-|PEERS|-+-|ADDRESS|-+-<peer_1_pubkey, value:address>
* | | | +-<peer_2_pubkey, value:address>
Expand All @@ -41,7 +42,6 @@
* | | +-<count, value>
* | |
* | +-|STORE|-+-<top_block, value: store height#top block hash>
* | +-<total transactions count>
* |
* +-|SETTINGS|-+-<key_1, value_1>
* | +-<key_2, value_2>
Expand Down Expand Up @@ -206,6 +206,8 @@ namespace iroha::ametsuchi::fmtstrings {

static auto constexpr kPathWsv{FMT_STRING(RDB_ROOT /**/ RDB_WSV)};

static auto constexpr kPathStore{FMT_STRING(RDB_ROOT /**/ RDB_STORE)};

// domain_id/account_name
static auto constexpr kPathAccount{FMT_STRING(RDB_PATH_ACCOUNT)};

Expand Down Expand Up @@ -244,6 +246,10 @@ namespace iroha::ametsuchi::fmtstrings {
* ############# FOLDERS ################
* ######################################
*/
// height ➡️ block data
static auto constexpr kBlockDataInStore{
FMT_STRING(RDB_ROOT /**/ RDB_STORE /**/ RDB_XXX)};

// account/height/index/ts ➡️ tx_hash
static auto constexpr kTransactionByPosition{FMT_STRING(
RDB_ROOT /**/ RDB_WSV /**/ RDB_TRANSACTIONS /**/ RDB_ACCOUNTS /**/
Expand Down Expand Up @@ -344,6 +350,10 @@ namespace iroha::ametsuchi::fmtstrings {
FMT_STRING(RDB_ROOT /**/ RDB_WSV /**/ RDB_TRANSACTIONS /**/
RDB_ACCOUNTS /**/ RDB_XXX /**/ RDB_F_TOTAL_COUNT)};

// ➡️ value
static auto constexpr kBlocksTotalCount{
FMT_STRING(RDB_ROOT /**/ RDB_STORE /**/ RDB_F_TOTAL_COUNT)};

// ➡️ txs total count
static auto constexpr kAllTxsTotalCount{FMT_STRING(
RDB_ROOT /**/ RDB_WSV /**/ RDB_TRANSACTIONS /**/ RDB_F_TOTAL_COUNT)};
Expand Down Expand Up @@ -1103,6 +1113,36 @@ namespace iroha::ametsuchi {
return dbCall<IrohadVersion, kOp, kSc>(common, fmtstrings::kWsvVersion);
}

/**
* Access to Stored blocks data.
* @tparam kOp @see kDbOperation
* @tparam kSc @see kDbEntry
* @param common @see RocksDbCommon
* @param height of the block
* @return operation result
*/
template <kDbOperation kOp = kDbOperation::kGet,
kDbEntry kSc = kDbEntry::kMustExist>
inline expected::Result<std::optional<std::string_view>, DbError> forBlock(
RocksDbCommon &common, uint64_t height) {
return dbCall<std::string_view, kOp, kSc>(
common, fmtstrings::kBlockDataInStore, height);
}

/**
* Access to Block store size.
* @tparam kOp @see kDbOperation
* @tparam kSc @see kDbEntry
* @param common @see RocksDbCommon
* @return operation result
*/
template <kDbOperation kOp = kDbOperation::kGet,
kDbEntry kSc = kDbEntry::kMustExist>
inline expected::Result<std::optional<uint64_t>, DbError> forBlocksTotalCount(
RocksDbCommon &common) {
return dbCall<uint64_t, kOp, kSc>(common, fmtstrings::kBlocksTotalCount);
}

/**
* Access to account quorum file.
* @tparam kOp @see kDbOperation
Expand Down
Loading

0 comments on commit 6dd0a32

Please sign in to comment.