Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dynamic proxy configuration #150

Merged
merged 4 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ steps:
args:
- '-c'
- |
git clone --depth 100 https://github.com/envoyproxy/data-plane-api.git proto/data-plane-api
git clone --depth 400 https://github.com/envoyproxy/data-plane-api.git proto/data-plane-api
git -C proto/data-plane-api checkout b84d3bea45b59abc3fd21fba26140a379461fc67
git clone --depth 100 https://github.com/cncf/udpa.git proto/udpa
git clone --depth 400 https://github.com/cncf/udpa.git proto/udpa
git -C proto/udpa checkout efcf912fb35470672231c7b7bef620f3d17f655a
git clone --depth 100 https://github.com/envoyproxy/protoc-gen-validate.git proto/protoc-gen-validate
git clone --depth 400 https://github.com/envoyproxy/protoc-gen-validate.git proto/protoc-gen-validate
git -C proto/protoc-gen-validate checkout e84d38a1a4c27d4662779c31a06528cdbc6b4b4f
git clone --depth 400 https://github.com/googleapis/googleapis.git proto/googleapis
git -C proto/googleapis checkout 2db5725bf898b544a0cf951e1694d3b0fce5eda3
Expand Down
27 changes: 27 additions & 0 deletions docs/proxy-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,40 @@ properties:
type: object
description: |
Static configuration of endpoints and filters.
NOTE: Exactly one of `static` or `dynamic` can be specified.
properties:
filter:
'$ref': '#/definitions/filterchain'
endpoints:
'$ref': '#/definitions/endpoints'
required:
- endpoints
dynamic:
Copy link
Contributor

@markmandel markmandel Dec 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a oneOf with children of dynamic and static ? https://swagger.io/docs/specification/data-models/oneof-anyof-allof-not/#oneof

Or would that be too complicated for the documentation? (Not for this PR, but maybe we should split this up into separate docs)

Actually, this would be the link of oneOf, yes? http://json-schema.org/draft-04/json-schema-validation.html#rfc.section.5.5 ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ideally it'll be oneof but oneof is a bit weird at the top level, instead of being able to say oneof static or dynamic at the top level, we can only say oneof or which might be more complex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good - just wanted to check 👍

type: object
description: |
Dynamic configuration of endpoints and filters.
NOTE: Exactly one of `static` or `dynamic` can be specified.
properties:
filter:
'$ref': '#/definitions/filterchain'
management_servers:
type: array
description: |
A list of XDS management servers to fetch configuration from.
Multiple servers can be provided for redundancy for the proxy to
fall back to upon error.
items:
type: object
description: |
Configuration for a management server.
properties:
address:
type: string
description: |
Address of the management server. This must have the `http(s)` scheme prefix.
Example: `http://example.com`
required:
- management_servers

required:
- version
Expand Down
15 changes: 15 additions & 0 deletions docs/proxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
### Proxy


#### Metrics

The proxy exposes the following core metrics:

- `quilkin_proxy_packets_dropped_total{reason}` (Counter)

The total number of packets (not associated with any session) that were dropped by proxy.
Not that packets reflected by this metric were dropped at an earlier stage before they were associated with any session. For session based metrics, see the list of [session metrics][session-metrics] instead.
* `reason = NoConfiguredEndpoints`
- `NoConfiguredEndpoints`: No upstream endpoints were available to send the packet to. This can occur e.g if the endpoints cluster was scaled down to zero and the proxy is configured via a control plane.

[session-metrics]: ./session.md
29 changes: 29 additions & 0 deletions examples/control-plane.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Copyright 2020 Google LLC All Rights Reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add an entry to https://github.com/googleforgames/quilkin/blob/master/docs/proxy-configuration.md while we are at it in this PR?

(Also just noted this page is not referenced from the home page, but that's a separate issue)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good catch, will fix!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# Example configuration for a Quilkin Proxy that is configured via a control plane.
#

version: v1alpha1
proxy:
mode: SERVER # Run the proxy in server mode.
id: my-proxy # An identifier for the proxy instance.
port: 7001 # the port to receive traffic to locally
dynamic: # Provide configuration of endpoints using an XDS management server
management_servers: # array of management servers to configure the proxy with.
# Multiple servers can be provided for redundancy.
- address: 127.0.0.1:26000
109 changes: 50 additions & 59 deletions src/cluster/cluster_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,23 @@
// and we will need to acquire a read lock with every packet that is processed
// to be able to capture the current endpoint state and pass it to Filters.
use parking_lot::RwLock;
use slog::{debug, warn, Logger};
use slog::{debug, o, warn, Logger};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::{fmt, sync::Arc};
use tokio::sync::{mpsc, oneshot, watch};

use crate::config::{EmptyListError, EndPoint, Endpoints, UpstreamEndpoints};
use crate::config::{EmptyListError, EndPoint, Endpoints, ManagementServer, UpstreamEndpoints};
use crate::xds::ads_client::{AdsClient, ClusterUpdate, ExecutionResult};

/// The max size of queue that provides updates from the XDS layer to the [`ClusterManager`].
const CLUSTER_UPDATE_QUEUE_SIZE: usize = 1000;

type Clusters = HashMap<String, Vec<SocketAddr>>;
pub type SharedClusterManager = Arc<RwLock<ClusterManager>>;

/// ClusterManager knows about all clusters and endpoints.
pub struct ClusterManager {
clusters: Clusters,
endpoints: Option<Endpoints>,
}

/// InitializeError is returned with an error message if the
Expand All @@ -56,72 +56,50 @@ impl fmt::Display for InitializeError {
impl std::error::Error for InitializeError {}

impl ClusterManager {
fn new(clusters: Clusters) -> Self {
Self { clusters }
fn new(endpoints: Option<Endpoints>) -> Self {
Self { endpoints }
}

fn update(&mut self, clusters: Clusters) {
self.clusters = clusters;
fn update(&mut self, endpoints: Option<Endpoints>) {
self.endpoints = endpoints;
}

/// Returns all endpoints known at the time of invocation.
/// Returns `None` if there are no endpoints.
pub fn get_all_endpoints(&self) -> Option<UpstreamEndpoints> {
let endpoints = self
.clusters
.iter()
.map(|(name, addresses)| {
addresses
.iter()
.map(move |addr| EndPoint::new(name.clone(), *addr, vec![]))
})
.flatten()
.collect();

match Endpoints::new(endpoints) {
Ok(endpoints) => Some(endpoints.into()),
Err(EmptyListError) => None,
}
self.endpoints.clone().map(|ep| ep.into())
}

/// Returns a ClusterManager backed by the fixed set of clusters provided in the config.
pub fn fixed(endpoints: &[(String, SocketAddr)]) -> ClusterManager {
Self::new(
endpoints
.iter()
.cloned()
.map(|(name, addr)| (name, vec![addr]))
.collect(),
)
pub fn fixed(endpoints: Vec<EndPoint>) -> SharedClusterManager {
Arc::new(RwLock::new(Self::new(Some(
Endpoints::new(endpoints)
.expect("endpoints list in config should be validated non-empty"),
))))
}

/// Returns a ClusterManager backed by a set of XDS servers.
/// This function starts an XDS client in the background that talks to
/// one of the provided servers.
/// Multiple servers are provided for redundancy - the servers will be
/// Multiple management servers can be provided for redundancy - the servers will be
/// connected to in turn only in the case of failure.
/// The set of clusters is continuously updated based on responses
/// from the XDS server.
/// The returned contains the XDS client's execution result after termination.
async fn dynamic<'a>(
log: Logger,
server_addresses: Vec<String>,
xds_node_id: Option<String>,
pub async fn from_xds<'a>(
base_logger: Logger,
management_servers: Vec<ManagementServer>,
xds_node_id: String,
mut shutdown_rx: watch::Receiver<()>,
) -> Result<
(
Arc<RwLock<ClusterManager>>,
oneshot::Receiver<ExecutionResult>,
),
InitializeError,
> {
) -> Result<(SharedClusterManager, oneshot::Receiver<ExecutionResult>), InitializeError> {
let log = base_logger.new(o!("source" => "cluster::ClusterManager"));
let (cluster_updates_tx, mut cluster_updates_rx) =
mpsc::channel::<ClusterUpdate>(CLUSTER_UPDATE_QUEUE_SIZE);
let (execution_result_tx, execution_result_rx) = oneshot::channel::<ExecutionResult>();
Self::spawn_ads_client(
log.clone(),
xds_node_id.unwrap_or_default(),
server_addresses,
xds_node_id,
management_servers,
cluster_updates_tx,
execution_result_tx,
shutdown_rx.clone(),
Expand All @@ -132,7 +110,7 @@ impl ClusterManager {
let cluster_update =
Self::receive_initial_cluster_update(&mut cluster_updates_rx, &mut shutdown_rx).await?;

let cluster_manager = Arc::new(RwLock::new(Self::new(Self::create_clusters_from_update(
let cluster_manager = Arc::new(RwLock::new(Self::new(Self::create_endpoints_from_update(
cluster_update,
))));

Expand All @@ -148,27 +126,40 @@ impl ClusterManager {
Ok((cluster_manager, execution_result_rx))
}

fn create_clusters_from_update(update: ClusterUpdate) -> Clusters {
update
fn create_endpoints_from_update(update: ClusterUpdate) -> Option<Endpoints> {
// NOTE: We don't currently have support for consuming multiple clusters
// so here gather all endpoints into the same set, ignoring what cluster they
// belong to.
let endpoints = update
.into_iter()
.map(|(name, cluster)| {
let addresses = cluster
.fold(vec![], |mut endpoints, (_name, cluster)| {
let cluster_endpoints = cluster
.localities
.into_iter()
.map(|(_, endpoints)| endpoints.endpoints.into_iter().map(|ep| ep.address))
.flatten()
.collect::<Vec<_>>();
(name, addresses)
})
.collect()
.map(|(_, endpoints)| {
endpoints
.endpoints
.into_iter()
.map(|ep| EndPoint::new("N/A".into(), ep.address, vec![]))
})
.flatten();
endpoints.extend(cluster_endpoints);

endpoints
});

match Endpoints::new(endpoints) {
Ok(endpoints) => Some(endpoints),
Err(EmptyListError) => None,
}
}

// Spawns a task that runs an ADS client. Cluster updates from the client
// as well as execution result after termination are sent on the provided channels.
fn spawn_ads_client(
log: Logger,
node_id: String,
server_addresses: Vec<String>,
management_servers: Vec<ManagementServer>,
cluster_updates_tx: mpsc::Sender<ClusterUpdate>,
execution_result_tx: oneshot::Sender<ExecutionResult>,
shutdown_rx: watch::Receiver<()>,
Expand All @@ -178,7 +169,7 @@ impl ClusterManager {
.run(
log.clone(),
node_id,
server_addresses,
management_servers,
cluster_updates_tx,
shutdown_rx,
)
Expand Down Expand Up @@ -227,7 +218,7 @@ impl ClusterManager {
update = cluster_updates_rx.recv() => {
match update {
Some(update) => {
let update = Self::create_clusters_from_update(update);
let update = Self::create_endpoints_from_update(update);
debug!(log, "Received a cluster update.");
cluster_manager.write().update(update);
}
Expand Down
9 changes: 8 additions & 1 deletion src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ use std::collections::HashMap;
use std::net::SocketAddr;

#[cfg(not(doctest))]
mod cluster_manager;
pub(crate) mod cluster_manager;

// Stub module to work-around not including cluster_manager in doc tests.
#[cfg(doctest)]
pub(crate) mod cluster_manager {
pub struct ClusterManager;
pub struct SharedClusterManager;
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Endpoint {
Expand Down
24 changes: 19 additions & 5 deletions src/config/error.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
use crate::extensions::Error as FilterRegistryError;
use std::fmt::{self, Display, Formatter};

#[derive(Debug, PartialEq)]
pub struct ValueInvalidArgs {
pub field: String,
pub clarification: Option<String>,
pub examples: Option<Vec<String>>,
}

/// Validation failure for a Config
#[derive(Debug, PartialEq)]
pub enum ValidationError {
NotUnique(String),
EmptyList(String),
ValueInvalid(String, Option<Vec<String>>),
ValueInvalid(ValueInvalidArgs),
FilterInvalid(FilterRegistryError),
}

Expand All @@ -15,11 +22,18 @@ impl Display for ValidationError {
match self {
ValidationError::NotUnique(field) => write!(f, "field {} is not unique", field),
ValidationError::EmptyList(field) => write!(f, "field {} is cannot be an empty", field),
ValidationError::ValueInvalid(field, examples) => write!(
ValidationError::ValueInvalid(args) => write!(
f,
"{} has an invalid value {}",
field,
examples.as_ref().map(|v| v.join(",")).unwrap_or_default()
"{} has an invalid value{}{}",
args.field,
args.clarification
.as_ref()
.map(|v| format!(": {}", v))
.unwrap_or_default(),
args.examples
.as_ref()
.map(|v| format!("examples: {}", v.join(",")))
.unwrap_or_default()
),
ValidationError::FilterInvalid(reason) => {
write!(f, "filter configuration is invalid: {}", reason)
Expand Down
Loading