Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ds): ds list #2066

Open
wants to merge 1 commit into
base: 02-21-fix_job-runner_fix_build_pack_in_job-runner_dockerfile
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/common/chirp-workflow/core/src/ctx/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub async fn wait_for_workflow<W: Workflow>(
.await?
}

/// Finds the first workflow with the given tags.
/// Finds the first incomplete workflow with the given tags.
pub async fn find_workflow<W: Workflow>(
db: &DatabaseHandle,
tags: impl AsTags,
Expand Down
2 changes: 1 addition & 1 deletion packages/common/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl OperationCtx {
)
}

/// Finds the first workflow with the given tags.
/// Finds the first incomplete workflow with the given tags.
pub async fn find_workflow<W: Workflow>(
&self,
tags: impl AsTags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,6 @@ impl Database for DatabaseFdbSqliteNats {
}
}

tracing::info!(?current_workflow_name, ?current_state, "-----");

if let Some(workflow_name) = current_workflow_name {
let entry = other_workflow_counts
.entry(workflow_name.clone())
Expand All @@ -517,8 +515,6 @@ impl Database for DatabaseFdbSqliteNats {
}
}

tracing::info!(?other_workflow_counts, "-----");

Result::<_, fdb::FdbBindingError>::Ok(())
},
async {
Expand Down
2 changes: 1 addition & 1 deletion packages/common/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub trait Database: Send {
/// Retrieves a workflow with the given ID.
async fn get_workflow(&self, workflow_id: Uuid) -> WorkflowResult<Option<WorkflowData>>;

/// Retrieves a workflow with the given name and tags.
/// Retrieves the first incomplete workflow with the given name and tags.
async fn find_workflow(
&self,
workflow_name: &str,
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/services/ds/src/keys/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<'de> TupleUnpack<'de> for ServerKey {
}
}

#[derive(Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct ServerKeyData {
pub is_destroyed: bool,
pub tags: Vec<(String, String)>,
Expand Down
45 changes: 45 additions & 0 deletions packages/edge/services/ds/src/keys/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,51 @@ impl<'de> TupleUnpack<'de> for CreateTsKey {
}
}

#[derive(Debug)]
pub struct WorkflowIdKey {
server_id: Uuid,
}

impl WorkflowIdKey {
pub fn new(server_id: Uuid) -> Self {
WorkflowIdKey { server_id }
}
}

impl FormalKey for WorkflowIdKey {
type Value = Uuid;

fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
Ok(Uuid::from_slice(raw)?)
}

fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
Ok(value.as_bytes().to_vec())
}
}

impl TuplePack for WorkflowIdKey {
fn pack<W: std::io::Write>(
&self,
w: &mut W,
tuple_depth: TupleDepth,
) -> std::io::Result<VersionstampOffset> {
let t = ("server", "data", self.server_id, "workflow_id");
t.pack(w, tuple_depth)
}
}

impl<'de> TupleUnpack<'de> for WorkflowIdKey {
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
let (input, (_, _, server_id, _)) =
<(Cow<str>, Cow<str>, Uuid, Cow<str>)>::unpack(input, tuple_depth)?;

let v = WorkflowIdKey { server_id };

Ok((input, v))
}
}

#[derive(Debug)]
pub struct ProxiedPortsKey {
pub server_id: Uuid,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chirp_workflow::prelude::*;
use fdb_util::{FormalKey, SNAPSHOT, end_of_key_range};
use fdb_util::{end_of_key_range, FormalKey, SNAPSHOT};
use foundationdb::{
self as fdb,
options::{ConflictRangeType, StreamingMode},
Expand Down
55 changes: 42 additions & 13 deletions packages/edge/services/ds/src/ops/server/get.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::{collections::HashMap, convert::TryInto};

use chirp_workflow::prelude::*;
use fdb_util::{FormalKey, SERIALIZABLE};
use foundationdb as fdb;
use futures_util::{StreamExt, TryStreamExt};

use crate::types::{
EndpointType, GameGuardProtocol, HostProtocol, NetworkMode, Port, Routing, Server,
ServerLifecycle, ServerResources,
use crate::{
keys,
types::{
EndpointType, GameGuardProtocol, HostProtocol, NetworkMode, Port, Routing, Server,
ServerLifecycle, ServerResources,
},
};

#[derive(sqlx::FromRow)]
#[derive(Debug, sqlx::FromRow)]
struct ServerRow {
env_id: Uuid,
tags: sqlx::types::Json<HashMap<String, String>>,
Expand Down Expand Up @@ -78,14 +83,38 @@ pub struct Output {

#[operation]
pub async fn ds_server_get(ctx: &OperationCtx, input: &Input) -> GlobalResult<Output> {
let server_data = futures_util::stream::iter(input.server_ids.clone())
.map(|server_id| async move {
let Some(workflow_id) = ctx
.find_workflow::<crate::workflows::server::Workflow>(("server_id", server_id))
.await?
else {
return GlobalResult::Ok(None);
};
let servers_with_wf_ids = ctx
.fdb()
.await?
.run(|tx, _mc| async move {
futures_util::stream::iter(input.server_ids.clone())
.map(|server_id| {
let tx = tx.clone();
async move {
let workflow_id_key = keys::server::WorkflowIdKey::new(server_id);
let workflow_id_entry = tx
.get(&keys::subspace().pack(&workflow_id_key), SERIALIZABLE)
.await?;

let workflow_id = workflow_id_key
.deserialize(&workflow_id_entry.ok_or(
fdb::FdbBindingError::CustomError(
format!("key should exist: {workflow_id_key:?}").into(),
),
)?)
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;

Ok((server_id, workflow_id))
}
})
.buffer_unordered(1024)
.try_collect::<Vec<_>>()
.await
})
.await?;

let server_data = futures_util::stream::iter(servers_with_wf_ids)
.map(|(server_id, workflow_id)| async move {
let pool = &ctx.sqlite_for_workflow(workflow_id).await?;

let (server_row, pb_row, port_ingress_rows, port_host_rows, proxied_port_rows) = tokio::try_join!(
Expand Down Expand Up @@ -144,7 +173,7 @@ pub async fn ds_server_get(ctx: &OperationCtx, input: &Input) -> GlobalResult<Ou
),
)?;

Ok(Some(ServerData {
GlobalResult::Ok(Some(ServerData {
server_id,
row: server_row,
pb_row,
Expand Down
5 changes: 3 additions & 2 deletions packages/edge/services/ds/src/ops/server/list_for_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub async fn ds_server_list_for_env(ctx: &OperationCtx, input: &Input) -> Global
let create_ts = create_ts_key
.deserialize(&entry)
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?;

keys::subspace().pack(&keys::env::ServerKey::new(
input.env_id,
create_ts,
Expand Down Expand Up @@ -75,10 +76,10 @@ pub async fn ds_server_list_for_env(ctx: &OperationCtx, input: &Input) -> Global

if input.include_destroyed || !data.is_destroyed {
// Compute intersection between ds tags and input tags
let tags_match = data
let tags_match = input
.tags
.iter()
.all(|(k, v)| input.tags.iter().any(|(k2, v2)| k == k2 && v == v2));
.all(|(k, v)| data.tags.iter().any(|(k2, v2)| k == k2 && v == v2));

if tags_match {
results.push(server_key.server_id);
Expand Down
12 changes: 10 additions & 2 deletions packages/edge/services/ds/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,14 +505,22 @@ async fn populate_fdb_idx(ctx: &ActivityCtx, input: &PopulateFdbIdxInput) -> Glo
.await?
.run(|tx, _mc| async move {
let create_ts_key = keys::server::CreateTsKey::new(input.server_id);

tx.set(
&keys::subspace().pack(&create_ts_key),
&create_ts_key
.serialize(input.create_ts)
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?,
);

let workflow_id_key = keys::server::WorkflowIdKey::new(input.server_id);
tx.set(
&keys::subspace().pack(&workflow_id_key),
&workflow_id_key
.serialize(ctx.workflow_id())
.map_err(|x| fdb::FdbBindingError::CustomError(x.into()))?,
);

// Add env index key
let server_key =
keys::env::ServerKey::new(input.env_id, input.create_ts, input.server_id);
let data = keys::env::ServerKeyData {
Expand Down Expand Up @@ -635,7 +643,7 @@ struct UpdateImageInput {
#[activity(UpdateImage)]
async fn update_image(ctx: &ActivityCtx, input: &UpdateImageInput) -> GlobalResult<()> {
let pool = ctx.sqlite().await?;

sql_execute!(
[ctx, pool]
"
Expand Down
Loading