Skip to content

Commit

Permalink
fix(ingester): log bubblegum indexing errors (metaplex-foundation#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
NicolasPennie authored May 31, 2023
1 parent d41175b commit 75c11f8
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 13 deletions.
1 change: 1 addition & 0 deletions nft_ingester/src/account_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ async fn handle_account(manager: Arc<ProgramTransformer>, item: RecvData) -> Opt
item.tries,
res,
begin_processing,
None,
);
}
ret_id
Expand Down
5 changes: 1 addition & 4 deletions nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ use log::{error, info};
use plerkle_messenger::{
redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, TRANSACTION_STREAM,
};
use tokio::{
signal,
task::{JoinSet},
};
use tokio::{signal, task::JoinSet};

#[tokio::main(flavor = "multi_thread")]
pub async fn main() -> Result<(), IngesterError> {
Expand Down
19 changes: 16 additions & 3 deletions nft_ingester/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub fn capture_result(
tries: usize,
res: Result<(), IngesterError>,
proc: Instant,
txn_sig: Option<&str>,
) -> Option<String> {
let mut ret_id = None;
match res {
Expand Down Expand Up @@ -75,18 +76,30 @@ pub fn capture_result(
metric! {
statsd_count!("ingester.ingest_error", 1, label.0 => &label.1, "stream" => stream, "error" => "de");
}
warn!("{}", e);
if let Some(sig) = txn_sig {
warn!("Error deserializing txn {}: {:?}", sig, e);
} else {
warn!("{}", e);
}
ret_id = Some(id);
}
Err(IngesterError::ParsingError(e)) => {
metric! {
statsd_count!("ingester.ingest_error", 1, label.0 => &label.1, "stream" => stream, "error" => "parse");
}
warn!("{}", e);
if let Some(sig) = txn_sig {
warn!("Error parsing txn {}: {:?}", sig, e);
} else {
warn!("{}", e);
}
ret_id = Some(id);
}
Err(err) => {
error!("Error handling account update: {:?}", err);
if let Some(sig) = txn_sig {
error!("Error handling update for txn {}: {:?}", sig, err);
} else {
error!("Error handling account update: {:?}", err);
}
metric! {
statsd_count!("ingester.ingest_update_error", 1, label.0 => &label.1, "stream" => stream, "error" => "u");
}
Expand Down
14 changes: 11 additions & 3 deletions nft_ingester/src/program_transformers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use blockbuster::{
};
use log::{debug, error, info};
use plerkle_serialization::{AccountInfo, Pubkey as FBPubkey, TransactionInfo};
use sea_orm::{DatabaseConnection, SqlxPostgresConnector, TransactionTrait};
use sea_orm::{DatabaseConnection, SqlxPostgresConnector};
use solana_sdk::pubkey::Pubkey;
use sqlx::PgPool;
use std::collections::{HashMap, HashSet, VecDeque};
Expand Down Expand Up @@ -69,7 +69,8 @@ impl ProgramTransformer {
&self,
tx: &'a TransactionInfo<'a>,
) -> Result<(), IngesterError> {
info!("Handling Transaction: {:?}", tx.signature());
let sig: Option<&str> = tx.signature();
info!("Handling Transaction: {:?}", sig);
let instructions = self.break_transaction(&tx);
let accounts = tx.account_keys().unwrap_or_default();
let slot = tx.slot();
Expand Down Expand Up @@ -125,7 +126,14 @@ impl ProgramTransformer {
&self.storage,
&self.task_sender,
)
.await?;
.await
.map_err(|err| {
error!(
"Failed to handle bubblegum instruction for txn {:?}: {:?}",
sig, err
);
return err;
})?;
}
_ => {
not_impl += 1;
Expand Down
2 changes: 2 additions & 0 deletions nft_ingester/src/transaction_notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ async fn handle_transaction(manager: Arc<ProgramTransformer>, item: RecvData) ->
"stream" => TRANSACTION_STREAM
);
}

let begin = Instant::now();
let res = manager.handle_transaction(&tx).await;
ret_id = capture_result(
Expand All @@ -99,6 +100,7 @@ async fn handle_transaction(manager: Arc<ProgramTransformer>, item: RecvData) ->
item.tries,
res,
begin,
tx.signature(),
);
}
ret_id
Expand Down
17 changes: 14 additions & 3 deletions tools/txn_forwarder/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
# Transaction Forwarder

## Run locally
## Send single transaction

```
cargo run -- \
--redis-url 'redis://localhost:6379' \
--rpc-url $RPC_URL \
--max-retries 10 \
--concurrency 3 \
--concurrency 10 \
single --txn 65MtykBysKAofpvKMkGPYotxQYFRHM47g99iCs6B9ZxfAbBmHKeLi2LSUA8KUcm4qYsot2z9AB4uREuUuEQNw8HA
```

## Backfill tree locally

```
cargo run -- \
--redis-url 'redis://localhost:6379' \
--rpc-url $RPC_URL \
--max-retries 10 \
--concurrency 10 \
address --address Cu61XHSkbasbvBc3atv5NUMz6C8FYmocNkH7mtjLFjR7
```

## Run against an env
## Backfill tree against Dev/Prod

```
cargo run -- \
Expand Down

0 comments on commit 75c11f8

Please sign in to comment.