forked from film42/sidekiq-rs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduled.rs
77 lines (61 loc) · 2.17 KB
/
scheduled.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use crate::{periodic::PeriodicJob, RedisPool, UnitOfWork};
use tracing::debug;
pub struct Scheduled {
redis: RedisPool,
}
impl Scheduled {
#[must_use]
pub fn new(redis: RedisPool) -> Self {
Self { redis }
}
pub async fn enqueue_jobs(
&self,
now: chrono::DateTime<chrono::Utc>,
sorted_sets: &Vec<String>,
) -> Result<usize, Box<dyn std::error::Error>> {
let mut n = 0;
for sorted_set in sorted_sets {
let mut redis = self.redis.get().await?;
let jobs: Vec<String> = redis
.zrangebyscore_limit(sorted_set.clone(), "-inf", now.timestamp(), 0, 100)
.await?;
n += jobs.len();
for job in jobs {
if redis.zrem(sorted_set.clone(), job.clone()).await? {
let work = UnitOfWork::from_job_string(job)?;
debug!({
"class" = &work.job.class,
"queue" = &work.queue
}, "Enqueueing job");
work.enqueue_direct(&mut redis).await?;
}
}
}
Ok(n)
}
pub async fn enqueue_periodic_jobs(
&self,
now: chrono::DateTime<chrono::Utc>,
) -> Result<usize, Box<dyn std::error::Error>> {
let mut conn = self.redis.get().await?;
let periodic_jobs: Vec<String> = conn
.zrangebyscore_limit("periodic".to_string(), "-inf", now.timestamp(), 0, 100)
.await?;
for periodic_job in &periodic_jobs {
let pj = PeriodicJob::from_periodic_job_string(periodic_job.clone())?;
if pj.update(&mut conn, periodic_job).await? {
let job = pj.into_job();
let work = UnitOfWork::from_job(job);
debug!({
"args" = &pj.args,
"class" = &work.job.class,
"queue" = &work.queue,
"name" = &pj.name,
"cron" = &pj.cron,
}, "Enqueueing periodic job");
work.enqueue_direct(&mut conn).await?;
}
}
Ok(periodic_jobs.len())
}
}