Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add k8s metrics receiving status #6977

Merged
merged 3 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,9 @@ func (aH *APIHandler) RegisterInfraMetricsRoutes(router *mux.Router, am *AuthMid
jobsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getJobAttributeKeys)).Methods(http.MethodGet)
jobsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getJobAttributeValues)).Methods(http.MethodGet)
jobsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getJobList)).Methods(http.MethodPost)

infraOnboardingSubRouter := router.PathPrefix("/api/v1/infra_onboarding").Subrouter()
infraOnboardingSubRouter.HandleFunc("/k8s/status", am.ViewAccess(aH.getK8sInfraOnboardingStatus)).Methods(http.MethodGet)
}

func (aH *APIHandler) RegisterWebSocketPaths(router *mux.Router, am *AuthMiddleware) {
Expand Down
49 changes: 49 additions & 0 deletions pkg/query-service/app/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,3 +597,52 @@ func (aH *APIHandler) getPvcAttributeValues(w http.ResponseWriter, r *http.Reque

aH.Respond(w, values)
}

func (aH *APIHandler) getK8sInfraOnboardingStatus(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

status := model.OnboardingStatus{}

didSendPodMetrics, err := aH.podsRepo.DidSendPodMetrics(ctx)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}

if !didSendPodMetrics {
aH.Respond(w, status)
return
}

didSendClusterMetrics, err := aH.podsRepo.DidSendClusterMetrics(ctx)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}

didSendNodeMetrics, err := aH.nodesRepo.DidSendNodeMetrics(ctx)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}

didSendOptionalPodMetrics, err := aH.podsRepo.IsSendingOptionalPodMetrics(ctx)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}

requiredMetadata, err := aH.podsRepo.SendingRequiredMetadata(ctx)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil)
return
}

status.DidSendPodMetrics = didSendPodMetrics
status.DidSendClusterMetrics = didSendClusterMetrics
status.DidSendNodeMetrics = didSendNodeMetrics
status.IsSendingOptionalPodMetrics = didSendOptionalPodMetrics
status.IsSendingRequiredMetadata = requiredMetadata

aH.Respond(w, status)
}
78 changes: 78 additions & 0 deletions pkg/query-service/app/inframetrics/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,84 @@ import (
"go.signoz.io/signoz/pkg/query-service/model"
)

var (
// TODO(srikanthccv): import metadata yaml from receivers and use generated files to check the metrics
podMetricNamesToCheck = []string{
"k8s_pod_cpu_utilization",
"k8s_pod_memory_usage",
"k8s_pod_cpu_request_utilization",
"k8s_pod_memory_request_utilization",
"k8s_pod_cpu_limit_utilization",
"k8s_pod_memory_limit_utilization",
"k8s_container_restarts",
"k8s_pod_phase",
}
nodeMetricNamesToCheck = []string{
"k8s_node_cpu_utilization",
"k8s_node_allocatable_cpu",
"k8s_node_memory_usage",
"k8s_node_allocatable_memory",
"k8s_node_condition_ready",
}
clusterMetricNamesToCheck = []string{
"k8s_daemonset_desired_scheduled_nodes",
"k8s_daemonset_current_scheduled_nodes",
"k8s_deployment_desired",
"k8s_deployment_available",
"k8s_job_desired_successful_pods",
"k8s_job_active_pods",
"k8s_job_failed_pods",
"k8s_job_successful_pods",
"k8s_statefulset_desired_pods",
"k8s_statefulset_current_pods",
}
optionalPodMetricNamesToCheck = []string{
"k8s_pod_cpu_request_utilization",
"k8s_pod_memory_request_utilization",
"k8s_pod_cpu_limit_utilization",
"k8s_pod_memory_limit_utilization",
}

// did they ever send _any_ pod metrics?
didSendPodMetricsQuery = `
SELECT count() FROM %s.%s WHERE metric_name IN (%s)
`

// did they ever send any node metrics?
didSendNodeMetricsQuery = `
SELECT count() FROM %s.%s WHERE metric_name IN (%s)
`

// did they ever send any cluster metrics?
didSendClusterMetricsQuery = `
SELECT count() FROM %s.%s WHERE metric_name IN (%s)
`

// if they ever sent _any_ pod metrics, we assume they know how to send pod metrics
// now, are they sending optional pod metrics such request/limit metrics?
isSendingOptionalPodMetricsQuery = `
SELECT count() FROM %s.%s WHERE metric_name IN (%s)
`

// there should be [cluster, node, namespace, one of (deployment, statefulset, daemonset, cronjob, job)] for each pod
isSendingRequiredMetadataQuery = `
SELECT any(JSONExtractString(labels, 'k8s_cluster_name')) as k8s_cluster_name,
any(JSONExtractString(labels, 'k8s_node_name')) as k8s_node_name,
any(JSONExtractString(labels, 'k8s_namespace_name')) as k8s_namespace_name,
any(JSONExtractString(labels, 'k8s_deployment_name')) as k8s_deployment_name,
any(JSONExtractString(labels, 'k8s_statefulset_name')) as k8s_statefulset_name,
any(JSONExtractString(labels, 'k8s_daemonset_name')) as k8s_daemonset_name,
any(JSONExtractString(labels, 'k8s_cronjob_name')) as k8s_cronjob_name,
any(JSONExtractString(labels, 'k8s_job_name')) as k8s_job_name,
JSONExtractString(labels, 'k8s_pod_name') as k8s_pod_name
FROM %s.%s WHERE metric_name IN (%s)
AND (unix_milli >= (toUnixTimestamp(now() - toIntervalMinute(60)) * 1000))
AND JSONExtractString(labels, 'k8s_namespace_name') NOT IN ('kube-system', 'kube-public', 'kube-node-lease', 'metallb-system')
GROUP BY k8s_pod_name
LIMIT 1 BY k8s_cluster_name, k8s_node_name, k8s_namespace_name
`
)

// getParamsForTopItems returns the step, time series table name and samples table name
// for the top items query. what are we doing here?
// we want to identify the top hosts/pods/nodes quickly, so we use pre-aggregated data
Expand Down
17 changes: 17 additions & 0 deletions pkg/query-service/app/inframetrics/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package inframetrics

import (
"context"
"fmt"
"math"
"sort"
"strings"

"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
Expand Down Expand Up @@ -62,6 +65,20 @@ func (n *NodesRepo) GetNodeAttributeKeys(ctx context.Context, req v3.FilterAttri
return attributeKeysResponse, nil
}

func (n *NodesRepo) DidSendNodeMetrics(ctx context.Context) (bool, error) {
namesStr := "'" + strings.Join(nodeMetricNamesToCheck, "','") + "'"

query := fmt.Sprintf(didSendNodeMetricsQuery,
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)

count, err := n.reader.GetCountOfThings(ctx, query)
if err != nil {
return false, err
}

return count > 0, nil
}

func (n *NodesRepo) GetNodeAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForNodes
Expand Down
134 changes: 134 additions & 0 deletions pkg/query-service/app/inframetrics/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package inframetrics

import (
"context"
"fmt"
"math"
"sort"
"strings"

"go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
Expand Down Expand Up @@ -105,6 +108,137 @@ func (p *PodsRepo) GetPodAttributeValues(ctx context.Context, req v3.FilterAttri
return attributeValuesResponse, nil
}

func (p *PodsRepo) DidSendPodMetrics(ctx context.Context) (bool, error) {
namesStr := "'" + strings.Join(podMetricNamesToCheck, "','") + "'"

query := fmt.Sprintf(didSendPodMetricsQuery,
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)

count, err := p.reader.GetCountOfThings(ctx, query)
if err != nil {
return false, err
}

return count > 0, nil
}

func (p *PodsRepo) DidSendClusterMetrics(ctx context.Context) (bool, error) {
namesStr := "'" + strings.Join(clusterMetricNamesToCheck, "','") + "'"

query := fmt.Sprintf(didSendClusterMetricsQuery,
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)

count, err := p.reader.GetCountOfThings(ctx, query)
if err != nil {
return false, err
}

return count > 0, nil
}

func (p *PodsRepo) IsSendingOptionalPodMetrics(ctx context.Context) (bool, error) {
namesStr := "'" + strings.Join(optionalPodMetricNamesToCheck, "','") + "'"

query := fmt.Sprintf(isSendingOptionalPodMetricsQuery,
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME, namesStr)

count, err := p.reader.GetCountOfThings(ctx, query)
if err != nil {
return false, err
}

return count > 0, nil
}

func (p *PodsRepo) SendingRequiredMetadata(ctx context.Context) ([]model.PodOnboardingStatus, error) {
namesStr := "'" + strings.Join(podMetricNamesToCheck, "','") + "'"

query := fmt.Sprintf(isSendingRequiredMetadataQuery,
constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_V4_TABLENAME, namesStr)

result, err := p.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, err
}

statuses := []model.PodOnboardingStatus{}

// for each pod, check if we have all the required metadata
for _, row := range result {
status := model.PodOnboardingStatus{}
switch v := row.Data["k8s_cluster_name"].(type) {
case string:
status.HasClusterName = true
status.ClusterName = v
case *string:
status.HasClusterName = *v != ""
status.ClusterName = *v
}
switch v := row.Data["k8s_node_name"].(type) {
case string:
status.HasNodeName = true
status.NodeName = v
case *string:
status.HasNodeName = *v != ""
status.NodeName = *v
}
switch v := row.Data["k8s_namespace_name"].(type) {
case string:
status.HasNamespaceName = true
status.NamespaceName = v
case *string:
status.HasNamespaceName = *v != ""
status.NamespaceName = *v
}
switch v := row.Data["k8s_deployment_name"].(type) {
case string:
status.HasDeploymentName = true
case *string:
status.HasDeploymentName = *v != ""
}
switch v := row.Data["k8s_statefulset_name"].(type) {
case string:
status.HasStatefulsetName = true
case *string:
status.HasStatefulsetName = *v != ""
}
switch v := row.Data["k8s_daemonset_name"].(type) {
case string:
status.HasDaemonsetName = true
case *string:
status.HasDaemonsetName = *v != ""
}
switch v := row.Data["k8s_cronjob_name"].(type) {
case string:
status.HasCronjobName = true
case *string:
status.HasCronjobName = *v != ""
}
switch v := row.Data["k8s_job_name"].(type) {
case string:
status.HasJobName = true
case *string:
status.HasJobName = *v != ""
}

switch v := row.Data["k8s_pod_name"].(type) {
case string:
status.PodName = v
case *string:
status.PodName = *v
}

if !status.HasClusterName ||
!status.HasNodeName ||
!status.HasNamespaceName ||
(!status.HasDeploymentName && !status.HasStatefulsetName && !status.HasDaemonsetName && !status.HasCronjobName && !status.HasJobName) {
statuses = append(statuses, status)
}
}

return statuses, nil
}

func (p *PodsRepo) getMetadataAttributes(ctx context.Context, req model.PodListRequest) (map[string]map[string]string, error) {
podAttrs := map[string]map[string]string{}

Expand Down
23 changes: 23 additions & 0 deletions pkg/query-service/model/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,3 +731,26 @@ type VolumeListRecord struct {
VolumeUsage float64 `json:"volumeUsage"`
Meta map[string]string `json:"meta"`
}

type PodOnboardingStatus struct {
ClusterName string `json:"clusterName"`
NodeName string `json:"nodeName"`
NamespaceName string `json:"namespaceName"`
PodName string `json:"podName"`
HasClusterName bool `json:"hasClusterName"`
HasNodeName bool `json:"hasNodeName"`
HasNamespaceName bool `json:"hasNamespaceName"`
HasDeploymentName bool `json:"hasDeploymentName"`
HasStatefulsetName bool `json:"hasStatefulsetName"`
HasDaemonsetName bool `json:"hasDaemonsetName"`
HasCronjobName bool `json:"hasCronjobName"`
HasJobName bool `json:"hasJobName"`
}

type OnboardingStatus struct {
DidSendPodMetrics bool `json:"didSendPodMetrics"`
DidSendNodeMetrics bool `json:"didSendNodeMetrics"`
DidSendClusterMetrics bool `json:"didSendClusterMetrics"`
IsSendingOptionalPodMetrics bool `json:"isSendingOptionalPodMetrics"`
IsSendingRequiredMetadata []PodOnboardingStatus `json:"isSendingRequiredMetadata"`
}
Loading