Skip to content

Commit

Permalink
Added demo program to show use of LISTEN/NOTIFY.
Browse files Browse the repository at this point in the history
  • Loading branch information
thedodd committed Jan 31, 2020
1 parent 847de7a commit 59b2acb
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 0 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ members = [
".",
"sqlx-core",
"sqlx-macros",
"examples/postgres-listen",
"examples/realworld-postgres"
]

Expand Down
10 changes: 10 additions & 0 deletions examples/postgres-listen/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "postgres-listen"
version = "0.1.0"
edition = "2018"
workspace = "../.."

[dependencies]
async-std = { version = "1.4.0", features = [ "attributes", "unstable" ] }
sqlx = { path = "../..", features = [ "postgres", "tls" ] }
futures = "0.3.1"
18 changes: 18 additions & 0 deletions examples/postgres-listen/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Postgres LISTEN/NOTIFY
======================

## Usage

Declare the database URL. This example does not include any reading or writing of data.

```
export DATABASE_URL="postgres://postgres@localhost/postgres"
```

Run.

```
cargo run
```

The example program should connect to the database, and create a LISTEN loop on a predefined set of channels. A NOTIFY task will be spawned which will connect to the same database and will emit notifications on a 5 second interval.
57 changes: 57 additions & 0 deletions examples/postgres-listen/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::time::Duration;

use async_std::stream;
use futures::stream::StreamExt;
use sqlx::postgres::PgPoolExt;

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Building PG pool.");
let conn_str =
std::env::var("DATABASE_URL").expect("Env var DATABASE_URL is required for this example.");
let pool = sqlx::PgPool::new(&conn_str).await?;

let notify_pool = pool.clone();
let _t = async_std::task::spawn(async move {
stream::interval(Duration::from_secs(5))
.for_each(move |_| notify(notify_pool.clone()))
.await
});

println!("Starting LISTEN loop.");
let mut listener = pool.listen(&["chan0", "chan1", "chan2"]);
let mut counter = 0usize;
loop {
let res = listener.recv().await;
println!("[from recv]: {:?}", res);
counter += 1;
if counter >= 3 {
break;
}
}

let stream = listener.into_stream();
futures::pin_mut!(stream);
while let Some(res) = stream.next().await {
println!("[from stream]: {:?}", res);
}

Ok(())
}

async fn notify(pool: sqlx::PgPool) {
let mut conn = match pool.acquire().await {
Ok(conn) => conn,
Err(err) => return println!("[from notify]: {:?}", err),
};
let res = sqlx::Executor::send(
&mut conn,
r#"
NOTIFY "chan0", '{"payload": 0}';
NOTIFY "chan1", '{"payload": 1}';
NOTIFY "chan2", '{"payload": 2}';
"#,
)
.await;
println!("[from notify]: {:?}", res);
}

0 comments on commit 59b2acb

Please sign in to comment.