Skip to content

Commit

Permalink
fix(ds): ds list
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Feb 21, 2025
1 parent d167037 commit a7a1b58
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 26 deletions.
2 changes: 1 addition & 1 deletion packages/common/chirp-workflow/core/src/ctx/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub async fn wait_for_workflow<W: Workflow>(
.await?
}

/// Finds the first workflow with the given tags.
/// Finds the first incomplete workflow with the given tags.
pub async fn find_workflow<W: Workflow>(
db: &DatabaseHandle,
tags: impl AsTags,
Expand Down
2 changes: 1 addition & 1 deletion packages/common/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl OperationCtx {
)
}

/// Finds the first workflow with the given tags.
/// Finds the first incomplete workflow with the given tags.
pub async fn find_workflow<W: Workflow>(
&self,
tags: impl AsTags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,6 @@ impl Database for DatabaseFdbSqliteNats {
}
}

tracing::info!(?current_workflow_name, ?current_state, "-----");

if let Some(workflow_name) = current_workflow_name {
let entry = other_workflow_counts
.entry(workflow_name.clone())
Expand All @@ -517,8 +515,6 @@ impl Database for DatabaseFdbSqliteNats {
}
}

tracing::info!(?other_workflow_counts, "-----");

Result::<_, fdb::FdbBindingError>::Ok(())
},
async {
Expand Down
2 changes: 1 addition & 1 deletion packages/common/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub trait Database: Send {
/// Retrieves a workflow with the given ID.
async fn get_workflow(&self, workflow_id: Uuid) -> WorkflowResult<Option<WorkflowData>>;

/// Retrieves a workflow with the given name and tags.
/// Retrieves the first incomplete workflow with the given name and tags.
async fn find_workflow(
&self,
workflow_name: &str,
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/services/ds/src/keys/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<'de> TupleUnpack<'de> for ServerKey {
}
}

#[derive(Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct ServerKeyData {
pub is_destroyed: bool,
pub tags: Vec<(String, String)>,
Expand Down
45 changes: 45 additions & 0 deletions packages/edge/services/ds/src/keys/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,51 @@ impl<'de> TupleUnpack<'de> for CreateTsKey {
}
}

#[derive(Debug)]
pub struct WorkflowIdKey {
server_id: Uuid,
}

impl WorkflowIdKey {
pub fn new(server_id: Uuid) -> Self {
WorkflowIdKey { server_id }
}
}

impl FormalKey for WorkflowIdKey {
type Value = Uuid;

fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
Ok(Uuid::from_slice(raw)?)
}

fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
Ok(value.as_bytes().to_vec())
}
}

impl TuplePack for WorkflowIdKey {
fn pack<W: std::io::Write>(
&self,
w: &mut W,
tuple_depth: TupleDepth,
) -> std::io::Result<VersionstampOffset> {
let t = ("server", "data", self.server_id, "workflow_id");
t.pack(w, tuple_depth)
}
}

impl<'de> TupleUnpack<'de> for WorkflowIdKey {
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
let (input, (_, _, server_id, _)) =
<(Cow<str>, Cow<str>, Uuid, Cow<str>)>::unpack(input, tuple_depth)?;

let v = WorkflowIdKey { server_id };

Ok((input, v))
}
}

#[derive(Debug)]
pub struct ProxiedPortsKey {
pub server_id: Uuid,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chirp_workflow::prelude::*;
use fdb_util::{FormalKey, SNAPSHOT, end_of_key_range};
use fdb_util::{end_of_key_range, FormalKey, SNAPSHOT};
use foundationdb::{
self as fdb,
options::{ConflictRangeType, StreamingMode},
Expand Down
55 changes: 42 additions & 13 deletions packages/edge/services/ds/src/ops/server/get.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::{collections::HashMap, convert::TryInto};

use chirp_workflow::prelude::*;
use fdb_util::{FormalKey, SERIALIZABLE};
use foundationdb as fdb;
use futures_util::{StreamExt, TryStreamExt};

use crate::types::{
EndpointType, GameGuardProtocol, HostProtocol, NetworkMode, Port, Routing, Server,
ServerLifecycle, ServerResources,
use crate::{
keys,
types::{
EndpointType, GameGuardProtocol, HostProtocol, NetworkMode, Port, Routing, Server,
ServerLifecycle, ServerResources,
},
};

#[derive(sqlx::FromRow)]
#[derive(Debug, sqlx::FromRow)]
struct ServerRow {
env_id: Uuid,
tags: sqlx::types::Json<HashMap<String, String>>,
Expand Down Expand Up @@ -78,14 +83,38 @@ pub struct Output {

#[operation]
pub async fn ds_server_get(ctx: &OperationCtx, input: &Input) -> GlobalResult<Output> {
let server_data = futures_util::stream::iter(input.server_ids.clone())
.map(|server_id| async move {
let Some(workflow_id) = ctx
.find_workflow::<crate::workflows::server::Workflow>(("server_id", server_id))
.await?
else {
return GlobalResult::Ok(None);
};
let servers_with_wf_ids = ctx
.fdb()
.await?
.run(|tx, _mc| async move {
futures_util::stream::iter(input.server_ids.clone())
.map(|server_id| {
let tx = tx.clone();
async move {
let workflow_id_key = keys::server::WorkflowIdKey::new(server_id);
let workflow_id_entry = tx
.get(&keys::subspace().pack(&workflow_id_key), SERIALIZABLE)
.await?;

let workflow_id = workflow_id_key
.deserialize(&workflow_id_entry.ok_or(
fdb::FdbBindingError::CustomError(
format!("key should exist: {workflow_id_key:?}").into(),
),
)?)
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;

Ok((server_id, workflow_id))
}
})
.buffer_unordered(1024)
.try_collect::<Vec<_>>()
.await
})
.await?;

let server_data = futures_util::stream::iter(servers_with_wf_ids)
.map(|(server_id, workflow_id)| async move {
let pool = &ctx.sqlite_for_workflow(workflow_id).await?;

let (server_row, pb_row, port_ingress_rows, port_host_rows, proxied_port_rows) = tokio::try_join!(
Expand Down Expand Up @@ -144,7 +173,7 @@ pub async fn ds_server_get(ctx: &OperationCtx, input: &Input) -> GlobalResult<Ou
),
)?;

Ok(Some(ServerData {
GlobalResult::Ok(Some(ServerData {
server_id,
row: server_row,
pb_row,
Expand Down
5 changes: 3 additions & 2 deletions packages/edge/services/ds/src/ops/server/list_for_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub async fn ds_server_list_for_env(ctx: &OperationCtx, input: &Input) -> Global
let create_ts = create_ts_key
.deserialize(&entry)
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;

keys::subspace().pack(&keys::env::ServerKey::new(
input.env_id,
create_ts,
Expand Down Expand Up @@ -75,10 +76,10 @@ pub async fn ds_server_list_for_env(ctx: &OperationCtx, input: &Input) -> Global

if input.include_destroyed || !data.is_destroyed {
// Compute intersection between ds tags and input tags
let tags_match = data
let tags_match = input
.tags
.iter()
.all(|(k, v)| input.tags.iter().any(|(k2, v2)| k == k2 && v == v2));
.all(|(k, v)| data.tags.iter().any(|(k2, v2)| k == k2 && v == v2));

if tags_match {
results.push(server_key.server_id);
Expand Down
12 changes: 10 additions & 2 deletions packages/edge/services/ds/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,14 +505,22 @@ async fn populate_fdb_idx(ctx: &ActivityCtx, input: &PopulateFdbIdxInput) -> Glo
.await?
.run(|tx, _mc| async move {
let create_ts_key = keys::server::CreateTsKey::new(input.server_id);

tx.set(
&keys::subspace().pack(&create_ts_key),
&create_ts_key
.serialize(input.create_ts)
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?,
);

let workflow_id_key = keys::server::WorkflowIdKey::new(input.server_id);
tx.set(
&keys::subspace().pack(&workflow_id_key),
&workflow_id_key
.serialize(ctx.workflow_id())
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?,
);

// Add env index key
let server_key =
keys::env::ServerKey::new(input.env_id, input.create_ts, input.server_id);
let data = keys::env::ServerKeyData {
Expand Down Expand Up @@ -635,7 +643,7 @@ struct UpdateImageInput {
#[activity(UpdateImage)]
async fn update_image(ctx: &ActivityCtx, input: &UpdateImageInput) -> GlobalResult<()> {
let pool = ctx.sqlite().await?;

sql_execute!(
[ctx, pool]
"
Expand Down

0 comments on commit a7a1b58

Please sign in to comment.