Skip to content

Commit

Permalink
fix(rust): implement a throttler to prevent hitting the rate limit an…
Browse files Browse the repository at this point in the history
…d being banned.
  • Loading branch information
nkaz001 committed Aug 18, 2024
1 parent 39db06b commit 1fce00b
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 24 deletions.
28 changes: 20 additions & 8 deletions collector/src/binance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ pub use http::{fetch_depth_snapshot, fetch_symbol_list, keep_connection};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tracing::{error, warn};

use crate::error::ConnectorError;
use crate::{error::ConnectorError, throttler::Throttler};

fn handle(
prev_u_map: &mut HashMap<String, i64>,
writer_tx: &UnboundedSender<(DateTime<Utc>, String, String)>,
recv_time: DateTime<Utc>,
data: String,
throttler: &Throttler,
) -> Result<(), ConnectorError> {
let j: serde_json::Value = serde_json::from_str(&data)?;
if let Some(j_data) = j.get("data") {
Expand All @@ -39,24 +40,28 @@ fn handle(
let prev_u = prev_u_map.get(symbol);
if prev_u.is_none() || U != *prev_u.unwrap() + 1 {
warn!(%symbol, "missing depth feed has been detected.");
// todo: to circumvent API limits when repeated occurrences of missing depth
// feed happen within a short timeframe, implementing a backoff mechanism
// may be necessary.
let symbol_ = symbol.to_string();
let writer_tx_ = writer_tx.clone();
let mut throttler_ = throttler.clone();
tokio::spawn(async move {
match fetch_depth_snapshot(&symbol_).await {
Ok(data) => {
match throttler_.execute(fetch_depth_snapshot(&symbol_)).await {
Some(Ok(data)) => {
let recv_time = Utc::now();
let _ = writer_tx_.send((recv_time, symbol_, data));
}
Err(error) => {
Some(Err(error)) => {
error!(
symbol = symbol_,
?error,
"couldn't fetch the depth snapshot."
);
}
None => {
warn!(
symbol = symbol_,
"Fetching the depth snapshot is rate-limited."
)
}
}
});
}
Expand All @@ -77,10 +82,17 @@ pub async fn run_collection(
let mut prev_u_map = HashMap::new();
let (ws_tx, mut ws_rx) = unbounded_channel();
let h = tokio::spawn(keep_connection(streams, symbols, ws_tx.clone()));
// todo: check the Spot API rate limits.
// https://www.binance.com/en/support/faq/rate-limits-on-binance-futures-281596e222414cdd9051664ea621cdc3
// The default rate limit per IP is 2,400/min and the weight is 20 at a depth of 1000.
// The maximum request rate for fetching snapshots is 120 per minute.
// Sets the rate limit with a margin to account for connection requests.
let throttler = Throttler::new(100);
loop {
match ws_rx.recv().await {
Some((recv_time, data)) => {
if let Err(error) = handle(&mut prev_u_map, &writer_tx, recv_time, data) {
if let Err(error) = handle(&mut prev_u_map, &writer_tx, recv_time, data, &throttler)
{
error!(?error, "couldn't handle the received data.");
}
}
Expand Down
27 changes: 19 additions & 8 deletions collector/src/binancefuturescm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ pub use http::{fetch_depth_snapshot, fetch_symbol_list, keep_connection};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tracing::{error, warn};

use crate::error::ConnectorError;
use crate::{error::ConnectorError, throttler::Throttler};

fn handle(
prev_u_map: &mut HashMap<String, i64>,
writer_tx: &UnboundedSender<(DateTime<Utc>, String, String)>,
recv_time: DateTime<Utc>,
data: String,
throttler: &Throttler,
) -> Result<(), ConnectorError> {
let j: serde_json::Value = serde_json::from_str(&data)?;
if let Some(j_data) = j.get("data") {
Expand Down Expand Up @@ -42,24 +43,28 @@ fn handle(
let prev_u = prev_u_map.get(symbol);
if prev_u.is_none() || pu != *prev_u.unwrap() {
warn!(%symbol, "missing depth feed has been detected.");
// todo: to circumvent API limits when repeated occurrences of missing depth
// feed happen within a short timeframe, implementing a backoff mechanism
// may be necessary.
let symbol_ = symbol.to_string();
let writer_tx_ = writer_tx.clone();
let mut throttler_ = throttler.clone();
tokio::spawn(async move {
match fetch_depth_snapshot(&symbol_).await {
Ok(data) => {
match throttler_.execute(fetch_depth_snapshot(&symbol_)).await {
Some(Ok(data)) => {
let recv_time = Utc::now();
let _ = writer_tx_.send((recv_time, symbol_, data));
}
Err(error) => {
Some(Err(error)) => {
error!(
symbol = symbol_,
?error,
"couldn't fetch the depth snapshot."
);
}
None => {
warn!(
symbol = symbol_,
"Fetching the depth snapshot is rate-limited."
)
}
}
});
}
Expand All @@ -79,10 +84,16 @@ pub async fn run_collection(
let mut prev_u_map = HashMap::new();
let (ws_tx, mut ws_rx) = unbounded_channel();
let h = tokio::spawn(keep_connection(streams, symbols, ws_tx.clone()));
// https://www.binance.com/en/support/faq/rate-limits-on-binance-futures-281596e222414cdd9051664ea621cdc3
// The default rate limit per IP is 2,400/min and the weight is 20 at a depth of 1000.
// The maximum request rate for fetching snapshots is 120 per minute.
// Sets the rate limit with a margin to account for connection requests.
let throttler = Throttler::new(100);
loop {
match ws_rx.recv().await {
Some((recv_time, data)) => {
if let Err(error) = handle(&mut prev_u_map, &writer_tx, recv_time, data) {
if let Err(error) = handle(&mut prev_u_map, &writer_tx, recv_time, data, &throttler)
{
error!(?error, "couldn't handle the received data.");
}
}
Expand Down
27 changes: 19 additions & 8 deletions collector/src/binancefuturesum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ pub use http::{fetch_depth_snapshot, fetch_symbol_list, keep_connection};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tracing::{error, warn};

use crate::error::ConnectorError;
use crate::{error::ConnectorError, throttler, throttler::Throttler};

fn handle(
prev_u_map: &mut HashMap<String, i64>,
writer_tx: &UnboundedSender<(DateTime<Utc>, String, String)>,
recv_time: DateTime<Utc>,
data: String,
throttler: &Throttler,
) -> Result<(), ConnectorError> {
let j: serde_json::Value = serde_json::from_str(&data)?;
if let Some(j_data) = j.get("data") {
Expand Down Expand Up @@ -42,24 +43,28 @@ fn handle(
let prev_u = prev_u_map.get(symbol);
if prev_u.is_none() || pu != *prev_u.unwrap() {
warn!(%symbol, "missing depth feed has been detected.");
// todo: to circumvent API limits when repeated occurrences of missing depth
// feed happen within a short timeframe, implementing a backoff mechanism
// may be necessary.
let symbol_ = symbol.to_string();
let writer_tx_ = writer_tx.clone();
let mut throttler_ = throttler.clone();
tokio::spawn(async move {
match fetch_depth_snapshot(&symbol_).await {
Ok(data) => {
match throttler_.execute(fetch_depth_snapshot(&symbol_)).await {
Some(Ok(data)) => {
let recv_time = Utc::now();
let _ = writer_tx_.send((recv_time, symbol_, data));
}
Err(error) => {
Some(Err(error)) => {
error!(
symbol = symbol_,
?error,
"couldn't fetch the depth snapshot."
);
}
None => {
warn!(
symbol = symbol_,
"Fetching the depth snapshot is rate-limited."
)
}
}
});
}
Expand All @@ -79,10 +84,16 @@ pub async fn run_collection(
let mut prev_u_map = HashMap::new();
let (ws_tx, mut ws_rx) = unbounded_channel();
let h = tokio::spawn(keep_connection(streams, symbols, ws_tx.clone()));
// https://www.binance.com/en/support/faq/rate-limits-on-binance-futures-281596e222414cdd9051664ea621cdc3
// The default rate limit per IP is 2,400/min and the weight is 20 at a depth of 1000.
// The maximum request rate for fetching snapshots is 120 per minute.
// Sets the rate limit with a margin to account for connection requests.
let throttler = Throttler::new(100);
loop {
match ws_rx.recv().await {
Some((recv_time, data)) => {
if let Err(error) = handle(&mut prev_u_map, &writer_tx, recv_time, data) {
if let Err(error) = handle(&mut prev_u_map, &writer_tx, recv_time, data, &throttler)
{
error!(?error, "couldn't handle the received data.");
}
}
Expand Down
1 change: 1 addition & 0 deletions collector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod binancefuturesum;
mod bybit;
mod error;
mod file;
mod throttler;

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
Expand Down
35 changes: 35 additions & 0 deletions collector/src/throttler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::{future::Future, sync::Arc};

use chrono::Utc;
use tokio::sync::Mutex;

#[derive(Clone)]
pub struct Throttler {
exec_ts: Arc<Mutex<Vec<i64>>>,
rate_limit: usize,
}

impl Throttler {
pub fn new(rate_limit: usize) -> Self {
Self {
exec_ts: Default::default(),
rate_limit,
}
}

pub async fn execute<Fut, T>(&mut self, fut: Fut) -> Option<T>
where
Fut: Future<Output = T>,
{
let cur_ts = Utc::now().timestamp_nanos_opt().unwrap();
{
let mut exec_ts_ = self.exec_ts.lock().await;
exec_ts_.retain(|ts| *ts > cur_ts - 60_000_000_000);
if exec_ts_.len() > self.rate_limit {
return None;
}
exec_ts_.push(cur_ts);
}
Some(fut.await)
}
}

0 comments on commit 1fce00b

Please sign in to comment.