Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix flaky unstable network tests #5013

Closed
Closed
36 changes: 18 additions & 18 deletions client/tests/integration/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use test_samples::{gen_account_in, ALICE_ID, BOB_ID};
// This test is also covered at the UI level in the iroha_cli tests
// in test_register_asset_definitions.py
fn client_register_asset_should_add_asset_once_but_not_twice() -> Result<()> {
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_620).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);
let (rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_620).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&[test_client.clone()]));

// Given
let account_id = ALICE_ID.clone();
Expand Down Expand Up @@ -59,8 +59,8 @@ fn client_register_asset_should_add_asset_once_but_not_twice() -> Result<()> {

#[test]
fn unregister_asset_should_remove_asset_from_account() -> Result<()> {
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_555).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);
let (rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_555).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&[test_client.clone()]));

// Given
let account_id = ALICE_ID.clone();
Expand Down Expand Up @@ -107,8 +107,8 @@ fn unregister_asset_should_remove_asset_from_account() -> Result<()> {
// This test is also covered at the UI level in the iroha_cli tests
// in test_mint_assets.py
fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() -> Result<()> {
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_000).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);
let (rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_000).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&[test_client.clone()]));

// Given
let account_id = ALICE_ID.clone();
Expand Down Expand Up @@ -141,8 +141,8 @@ fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() ->

#[test]
fn client_add_big_asset_quantity_to_existing_asset_should_increase_asset_amount() -> Result<()> {
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_510).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);
let (rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_510).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&[test_client.clone()]));

// Given
let account_id = ALICE_ID.clone();
Expand Down Expand Up @@ -175,8 +175,8 @@ fn client_add_big_asset_quantity_to_existing_asset_should_increase_asset_amount(

#[test]
fn client_add_asset_with_decimal_should_increase_asset_amount() -> Result<()> {
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_515).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);
let (rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_515).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&[test_client.clone()]));

// Given
let account_id = ALICE_ID.clone();
Expand Down Expand Up @@ -235,8 +235,8 @@ fn client_add_asset_with_decimal_should_increase_asset_amount() -> Result<()> {
// This test is also covered at the UI level in the iroha_cli tests
// in test_register_asset_definitions.py
fn client_add_asset_with_name_length_more_than_limit_should_not_commit_transaction() -> Result<()> {
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_520).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);
let (rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_520).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&[test_client.clone()]));
let pipeline_time = Config::pipeline_time();

// Given
Expand Down Expand Up @@ -281,8 +281,8 @@ fn client_add_asset_with_name_length_more_than_limit_should_not_commit_transacti
#[allow(clippy::expect_fun_call)]
#[test]
fn find_rate_and_make_exchange_isi_should_succeed() {
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_675).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);
let (rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_675).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&[test_client.clone()]));

let (dex_id, _dex_keypair) = gen_account_in("exchange");
let (seller_id, seller_keypair) = gen_account_in("company");
Expand Down Expand Up @@ -375,8 +375,8 @@ fn find_rate_and_make_exchange_isi_should_succeed() {

#[test]
fn transfer_asset_definition() {
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(11_060).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);
let (rt, _peer, test_client) = <PeerBuilder>::new().with_port(11_060).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&[test_client.clone()]));

let alice_id = ALICE_ID.clone();
let bob_id = BOB_ID.clone();
Expand Down Expand Up @@ -413,8 +413,8 @@ fn transfer_asset_definition() {

#[test]
fn fail_if_dont_satisfy_spec() {
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(11_125).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);
let (rt, _peer, test_client) = <PeerBuilder>::new().with_port(11_125).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&[test_client.clone()]));

let alice_id = ALICE_ID.clone();
let bob_id = BOB_ID.clone();
Expand Down
4 changes: 2 additions & 2 deletions client/tests/integration/asset_propagation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use test_samples::gen_account_in;
fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount_on_another_peer(
) -> Result<()> {
// Given
let (_rt, network, client) = Network::start_test_with_runtime(4, Some(10_450));
wait_for_genesis_committed(&network.clients(), 0);
let (rt, network, client) = Network::start_test_with_runtime(4, Some(10_450));
rt.block_on(wait_for_genesis_committed_async(&network.clients()));
let pipeline_time = Config::pipeline_time();

client.submit_blocking(SetParameter::new(Parameter::Block(
Expand Down
8 changes: 4 additions & 4 deletions client/tests/integration/events/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ fn transaction_execution_should_produce_events(
executable: impl Into<Executable>,
port: u16,
) -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(port).start_with_runtime();
wait_for_genesis_committed(&[client.clone()], 0);
let (rt, _peer, client) = <PeerBuilder>::new().with_port(port).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&[client.clone()]));

// spawn event reporter
let listener = client.clone();
Expand Down Expand Up @@ -178,8 +178,8 @@ fn transaction_execution_should_produce_events(
#[test]
#[allow(clippy::too_many_lines)]
fn produce_multiple_events() -> Result<()> {
let (_rt, _peer, client) = <PeerBuilder>::new().with_port(10_645).start_with_runtime();
wait_for_genesis_committed(&[client.clone()], 0);
let (rt, _peer, client) = <PeerBuilder>::new().with_port(10_645).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&[client.clone()]));

// Spawn event reporter
let listener = client.clone();
Expand Down
8 changes: 4 additions & 4 deletions client/tests/integration/events/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use test_samples::ALICE_ID;

#[test]
fn trigger_completion_success_should_produce_event() -> Result<()> {
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(11_050).start_with_runtime();
wait_for_genesis_committed(&vec![test_client.clone()], 0);
let (rt, _peer, test_client) = <PeerBuilder>::new().with_port(11_050).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&vec![test_client.clone()]));

let asset_definition_id = "rose#wonderland".parse()?;
let account_id = ALICE_ID.clone();
Expand Down Expand Up @@ -53,8 +53,8 @@ fn trigger_completion_success_should_produce_event() -> Result<()> {

#[test]
fn trigger_completion_failure_should_produce_event() -> Result<()> {
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(11_055).start_with_runtime();
wait_for_genesis_committed(&vec![test_client.clone()], 0);
let (rt, _peer, test_client) = <PeerBuilder>::new().with_port(11_055).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&vec![test_client.clone()]));

let account_id = ALICE_ID.clone();
let trigger_id = TriggerId::from_str("fail_box")?;
Expand Down
8 changes: 4 additions & 4 deletions client/tests/integration/events/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ fn test_with_instruction_and_status_and_port(
should_be: &TransactionStatus,
port: u16,
) -> Result<()> {
let (_rt, network, client) =
let (rt, network, client) =
Network::start_test_with_runtime(PEER_COUNT.try_into().unwrap(), Some(port));
let clients = network.clients();
wait_for_genesis_committed(&clients, 0);
rt.block_on(wait_for_genesis_committed_async(&clients));
let pipeline_time = Config::pipeline_time();

client.submit_blocking(SetParameter::new(Parameter::Block(
Expand Down Expand Up @@ -106,8 +106,8 @@ impl Checker {

#[test]
fn applied_block_must_be_available_in_kura() {
let (_rt, peer, client) = <PeerBuilder>::new().with_port(11_040).start_with_runtime();
wait_for_genesis_committed(&[client.clone()], 0);
let (rt, peer, client) = <PeerBuilder>::new().with_port(11_040).start_with_runtime();
rt.block_on(wait_for_genesis_committed_async(&[client.clone()]));

let event_filter = BlockEventFilter::default().for_status(BlockStatus::Applied);
let mut event_iter = client
Expand Down
118 changes: 77 additions & 41 deletions client/tests/integration/extra_functional/connected_peers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::thread;
use std::time::Duration;

use eyre::{Context, Result};
use iroha::{
Expand All @@ -9,10 +9,10 @@ use iroha::{
},
};
use iroha_config::parameters::actual::Root as Config;
use iroha_data_model::{domain::Domain, prelude::FindPeers, query::builder::QueryBuilderExt};
use iroha_primitives::unique_vec;
use rand::{seq::SliceRandom, thread_rng, Rng};
use test_network::*;
use tokio::runtime::Runtime;

#[ignore = "ignore, more in #2851"]
#[test]
Expand All @@ -27,39 +27,44 @@ fn connected_peers_with_f_1_0_1() -> Result<()> {

#[test]
fn register_new_peer() -> Result<()> {
let (_rt, network, _) = Network::start_test_with_runtime(4, Some(11_180));
wait_for_genesis_committed(&network.clients(), 0);
let pipeline_time = Config::pipeline_time();
let (rt, network, mut iroha) = Network::start_test_with_runtime(4, Some(11_180));
iroha.transaction_ttl = Some(Duration::from_millis(u64::MAX));
iroha.transaction_status_timeout = Duration::from_millis(u64::MAX);
rt.block_on(wait_for_genesis_committed_async(&network.clients()));
Comment on lines +31 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider compressing into a method, this repeats often

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's wroth extracting into test_network because this function is kinda specific for this particular tests...


let mut peer_clients: Vec<_> = Network::peers(&network)
.zip(Network::clients(&network))
.collect();

check_status(&peer_clients, 1);
check_status(&rt, &peer_clients, 1);

// Start new peer
let mut configuration = Config::test();
configuration.sumeragi.trusted_peers.value_mut().others =
unique_vec![peer_clients.choose(&mut thread_rng()).unwrap().0.id.clone()];
let rt = Runtime::test();
let new_peer = rt.block_on(
PeerBuilder::new()
.with_config(configuration)
.with_into_genesis(WithGenesis::None)
.with_port(11_225)
.start(),
);
let mut new_peer_client = Client::test(&new_peer.api_address);
new_peer_client.transaction_ttl = Some(Duration::from_millis(u64::MAX));
new_peer_client.transaction_status_timeout = Duration::from_millis(u64::MAX);

let register_peer = Register::peer(DataModelPeer::new(new_peer.id.clone()));
peer_clients
.choose(&mut thread_rng())
.unwrap()
.1
.submit_blocking(register_peer)?;
peer_clients.push((&new_peer, Client::test(&new_peer.api_address)));
thread::sleep(pipeline_time * 2 * 20); // Wait for some time to allow peers to connect
iroha.submit_blocking(register_peer)?;

check_status(&peer_clients, 2);
// Submit transaction through a new peer and wait for response to check that it is functioning properly
let isi = Register::domain(Domain::new("dummy".parse().unwrap()));
new_peer_client
.submit_blocking(isi)
.expect("failed to submit transaction through new peer");

peer_clients.push((&new_peer, new_peer_client));

check_status(&rt, &peer_clients, 3);

Ok(())
}
Expand All @@ -68,63 +73,94 @@ fn register_new_peer() -> Result<()> {
fn connected_peers_with_f(faults: u64, start_port: Option<u16>) -> Result<()> {
let n_peers = 3 * faults + 1;

let (_rt, network, _) = Network::start_test_with_runtime(
let (rt, network, mut iroha) = Network::start_test_with_runtime(
(n_peers)
.try_into()
.wrap_err("`faults` argument `u64` value too high, cannot convert to `u32`")?,
start_port,
);
wait_for_genesis_committed(&network.clients(), 0);
let pipeline_time = Config::pipeline_time();
iroha.transaction_ttl = Some(Duration::from_millis(u64::MAX));
iroha.transaction_status_timeout = Duration::from_millis(u64::MAX);
rt.block_on(wait_for_genesis_committed_async(&network.clients()));

let mut peer_clients: Vec<_> = Network::peers(&network)
.zip(Network::clients(&network))
.collect();

check_status(&peer_clients, 1);
check_status(&rt, &peer_clients, 1);

// Unregister a peer: committed with f = `faults` then `status.peers` decrements
let removed_peer_idx = rand::thread_rng().gen_range(0..peer_clients.len());
let (removed_peer, _) = &peer_clients[removed_peer_idx];
let removed_peer_idx = rand::thread_rng().gen_range(1..peer_clients.len());
let (removed_peer, mut removed_peer_client) = peer_clients.remove(removed_peer_idx);
removed_peer_client.transaction_ttl = Some(Duration::from_millis(u64::MAX));
removed_peer_client.transaction_status_timeout = Duration::from_millis(u64::MAX);

// Check that peer is present in topology
assert!(iroha
.query(FindPeers)
.execute_all()?
.into_iter()
.any(|peer| peer.id == removed_peer.id));

let unregister_peer = Unregister::peer(removed_peer.id.clone());
peer_clients
.choose(&mut thread_rng())
.unwrap()
.1
.submit_blocking(unregister_peer)?;
thread::sleep(pipeline_time * 2); // Wait for some time to allow peers to connect
let (removed_peer, removed_peer_client) = peer_clients.remove(removed_peer_idx);
iroha.submit_blocking(unregister_peer)?;

thread::sleep(pipeline_time * 2); // Wait for some time to allow peers to disconnect
// Check that peer is removed from topology
assert!(iroha
.query(FindPeers)
.execute_all()?
.into_iter()
.all(|peer| peer.id != removed_peer.id));

check_status(&rt, &peer_clients, 2);

check_status(&peer_clients, 2);
let status = removed_peer_client.get_status()?;
// Peer might have been disconnected before getting the block
assert!(status.blocks == 1 || status.blocks == 2);
assert_eq!(status.peers, 0);

// Re-register the peer: committed with f = `faults` - 1 then `status.peers` increments
let register_peer = Register::peer(DataModelPeer::new(removed_peer.id.clone()));
peer_clients
.choose(&mut thread_rng())
.unwrap()
.1
.submit_blocking(register_peer)?;
peer_clients.insert(removed_peer_idx, (removed_peer, removed_peer_client));
thread::sleep(pipeline_time * 2); // Wait for some time to allow peers to connect
iroha.submit_blocking(register_peer)?;

// Check that peer is present in topology again
assert!(iroha
.query(FindPeers)
.execute_all()?
.into_iter()
.any(|peer| peer.id == removed_peer.id));

check_status(&peer_clients, 3);
// Submit transaction by reconnected peer to check if it's functioning
removed_peer_client
.submit_blocking(Register::domain(Domain::new("dummy".parse().unwrap())))
.wrap_err("reconnected peer failed to submit transaction")?;

peer_clients.push((removed_peer, removed_peer_client));

check_status(&rt, &peer_clients, 4);

Ok(())
}

fn check_status(peer_clients: &[(&Peer, Client)], expected_blocks: u64) {
/// Wait for certain amount of blocks and check number of connected peers
fn check_status(
rt: &tokio::runtime::Runtime,
peer_clients: &[(&Peer, Client)],
expected_blocks: usize,
) {
let n_peers = peer_clients.len() as u64;

let clients = peer_clients
.iter()
.map(|(_, client)| client)
.cloned()
.collect::<Vec<_>>();

rt.block_on(wait_for_blocks_committed_async(&clients, expected_blocks));

for (_, peer_client) in peer_clients {
let status = peer_client.get_status().unwrap();

assert_eq!(status.peers, n_peers - 1);
assert_eq!(status.blocks, expected_blocks);
assert_eq!(status.blocks, expected_blocks as u64);
}
}
6 changes: 3 additions & 3 deletions client/tests/integration/extra_functional/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use iroha::data_model::{
domain::{Domain, DomainId},
isi::Register,
};
use test_network::{wait_for_genesis_committed, NetworkBuilder};
use test_network::{wait_for_genesis_committed_async, NetworkBuilder};

#[test]
fn all_peers_submit_genesis() {
Expand All @@ -20,10 +20,10 @@ fn multiple_genesis_4_peers_2_genesis() {
}

fn multiple_genesis_peers(n_peers: u32, n_genesis_peers: u32, port: u16) {
let (_rt, network, client) = NetworkBuilder::new(n_peers, Some(port))
let (rt, network, client) = NetworkBuilder::new(n_peers, Some(port))
.with_genesis_peers(n_genesis_peers)
.create_with_runtime();
wait_for_genesis_committed(&network.clients(), 0);
rt.block_on(wait_for_genesis_committed_async(&network.clients()));

let domain_id: DomainId = "foo".parse().expect("Valid");
let create_domain = Register::domain(Domain::new(domain_id));
Expand Down
Loading
Loading