Skip to content

Commit

Permalink
Merge #141638
Browse files Browse the repository at this point in the history
141638: sql: plumb vector search execution operators to the row engine r=DrewKimball a=DrewKimball

#### span: export EncodeConstraintKey

This commit exports `SpanBuilder.EncodeConstraintKey` so that it can
be used to directly encode a `constraint.Key` during planning.

Epic: CRDB-42943

Release note: None

#### sql: plumb vector search operators to planNode stage

This commit adds `planNode` implementations for the `VectorSearch`
and `VectorMutationSearch` operators. It also plumbs handling for
these operators from execbuilder to the distsql physical planner,
the latter of which is left unimplemented in this commit. This is
sufficient to run vanilla `EXPLAIN` and `EXPLAIN (VERBOSE)`.

Epic: CRDB-42943

Release note: None

#### sql: plumb vector search operators to row engine

This commit adds processor specs for the `VectorSearch` and
`VectorMutationSearch` operators. It also plumbs handling for the
operators from the distsql physical planner to the row engine.
Execution is left for a following commit.

Epic: CRDB-42943

Release note: None

Co-authored-by: Drew Kimball <[email protected]>
  • Loading branch information
craig[bot] and DrewKimball committed Feb 22, 2025
2 parents 3de70cd + bd35f42 commit fa8342d
Show file tree
Hide file tree
Showing 27 changed files with 1,161 additions and 12 deletions.
2 changes: 2 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ go_library(
"user.go",
"values.go",
"vars.go",
"vector_search.go",
"views.go",
"virtual_schema.go",
"virtual_table.go",
Expand Down Expand Up @@ -601,6 +602,7 @@ go_library(
"//pkg/util/uint128",
"//pkg/util/unique",
"//pkg/util/uuid",
"//pkg/util/vector",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
Expand Down
114 changes: 114 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/cockroach/pkg/util/vector"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
Expand Down Expand Up @@ -449,6 +450,9 @@ var (
ordinalityNotDistributableErr = newQueryNotSupportedError(
"ordinality operation cannot be distributed",
)
cannotDistributeVectorSearchErr = newQueryNotSupportedError(
"vector search operation cannot be distributed",
)
)

// mustWrapNode returns true if a node has no DistSQL-processor equivalent.
Expand Down Expand Up @@ -477,6 +481,8 @@ func (dsp *DistSQLPlanner) mustWrapNode(planCtx *PlanningCtx, node planNode) boo
case *unionNode:
case *valuesNode:
return mustWrapValuesNode(planCtx, n.specifiedInQuery)
case *vectorMutationSearchNode:
case *vectorSearchNode:
case *windowNode:
case *zeroNode:
case *zigzagJoinNode:
Expand Down Expand Up @@ -788,6 +794,10 @@ func checkSupportForPlanNode(
}
return canDistribute, nil

case *vectorSearchNode, *vectorMutationSearchNode:
// Don't allow distribution for vector search operators, for now.
return cannotDistribute, cannotDistributeVectorSearchErr

case *windowNode:
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
if err != nil {
Expand Down Expand Up @@ -4122,6 +4132,12 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(
}
}

case *vectorMutationSearchNode:
plan, err = dsp.createPlanForVectorMutationSearch(ctx, planCtx, n)

case *vectorSearchNode:
plan, err = dsp.createPlanForVectorSearch(planCtx, n)

case *windowNode:
plan, err = dsp.createPlanForWindow(ctx, planCtx, n)

Expand Down Expand Up @@ -4443,6 +4459,104 @@ func (dsp *DistSQLPlanner) addDistinctProcessors(
plan.SetMergeOrdering(spec.OutputOrdering)
}

func (dsp *DistSQLPlanner) createPlanForVectorSearch(
planCtx *PlanningCtx, n *vectorSearchNode,
) (*PhysicalPlan, error) {
var queryVector vector.T
switch t := n.queryVector.(type) {
case *tree.DPGVector:
queryVector = t.T
default:
return nil, errors.AssertionFailedf("unexpected query vector type: %T", t)
}

p := planCtx.NewPhysicalPlan()
colTypes := getTypesFromResultColumns(n.resultCols)
spec := &execinfrapb.VectorSearchSpec{
PrefixKey: n.prefixKey,
QueryVector: queryVector,
TargetNeighborCount: n.targetNeighborCount,
}
fetchCols := make([]descpb.ColumnID, len(n.cols))
for i, col := range n.cols {
fetchCols[i] = col.GetID()
}
if err := rowenc.InitIndexFetchSpec(
&spec.FetchSpec,
planCtx.ExtendedEvalCtx.Codec,
n.table,
n.index,
fetchCols,
); err != nil {
return nil, err
}

// Execute the vector search on the gateway node.
corePlacement := []physicalplan.ProcessorCorePlacement{{
SQLInstanceID: dsp.gatewaySQLInstanceID,
Core: execinfrapb.ProcessorCoreUnion{VectorSearch: spec},
}}
p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, colTypes, execinfrapb.Ordering{})
p.PlanToStreamColMap = identityMap(make([]int, len(colTypes)), len(colTypes))
return p, nil
}

func (dsp *DistSQLPlanner) createPlanForVectorMutationSearch(
ctx context.Context, planCtx *PlanningCtx, n *vectorMutationSearchNode,
) (*PhysicalPlan, error) {
plan, err := dsp.createPhysPlanForPlanNode(ctx, planCtx, n.input)
if err != nil {
return nil, err
}
// Add the partition column. Also add the quantized vector column for index
// puts.
inputTypes := plan.GetResultTypes()
outputTypes := append(inputTypes, types.Int)
plan.PlanToStreamColMap = append(plan.PlanToStreamColMap, len(inputTypes))
if n.isIndexPut {
outputTypes = append(outputTypes, types.Bytes)
plan.PlanToStreamColMap = append(plan.PlanToStreamColMap, len(inputTypes)+1)
}
// Retrieve the prefix and suffix index columns.
prefixKeyColumnOrdinals := make([]uint32, len(n.prefixKeyCols))
for i, col := range n.prefixKeyCols {
prefixKeyColumnOrdinals[i] = uint32(plan.PlanToStreamColMap[col])
}
suffixKeyColumnOrdinals := make([]uint32, len(n.suffixKeyCols))
for i, col := range n.suffixKeyCols {
suffixKeyColumnOrdinals[i] = uint32(plan.PlanToStreamColMap[col])
}
keyAndSuffixCols := n.table.IndexFetchSpecKeyAndSuffixColumns(n.index)
prefixKeyCols := keyAndSuffixCols[n.index.NumKeyColumns()-1:]
suffixKeyCols := keyAndSuffixCols[n.index.NumKeyColumns():]
spec := &execinfrapb.VectorMutationSearchSpec{
PrefixKeyColumnOrdinals: prefixKeyColumnOrdinals,
PrefixKeyColumns: prefixKeyCols,
QueryVectorColumnOrdinal: uint32(plan.PlanToStreamColMap[n.queryVectorCol]),
SuffixKeyColumnOrdinals: suffixKeyColumnOrdinals,
SuffixKeyColumns: suffixKeyCols,
IsIndexPut: n.isIndexPut,
}
// VectorMutationSearch operators materialize partition and quantized-vec
// columns rather than fetching from the table, so leave fetchCols empty.
var fetchCols []descpb.ColumnID
if err := rowenc.InitIndexFetchSpec(
&spec.FetchSpec,
planCtx.ExtendedEvalCtx.Codec,
n.table,
n.index,
fetchCols,
); err != nil {
return nil, err
}

// The vector mutation search can be conducted for each row independently, so
// it's fine to instantiate one instance for each stream.
pSpec := execinfrapb.ProcessorCoreUnion{VectorMutationSearch: spec}
plan.AddNoGroupingStage(pSpec, execinfrapb.PostProcessSpec{}, outputTypes, execinfrapb.Ordering{})
return plan, nil
}

func (dsp *DistSQLPlanner) createPlanForOrdinality(
ctx context.Context, planCtx *PlanningCtx, n *ordinalityNode,
) (*PhysicalPlan, error) {
Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,31 @@ func (e *distSQLSpecExecFactory) ConstructDeleteRange(
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: delete range")
}

func (e *distSQLSpecExecFactory) ConstructVectorSearch(
table cat.Table,
index cat.Index,
outCols exec.TableColumnOrdinalSet,
prefixKey constraint.Key,
queryVector tree.TypedExpr,
targetNeighborCount uint64,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473,
"experimental opt-driven distsql planning: vector-search")
}

func (e *distSQLSpecExecFactory) ConstructVectorMutationSearch(
input exec.Node,
table cat.Table,
index cat.Index,
prefixKeyCols []exec.NodeColumnOrdinal,
queryVectorCol exec.NodeColumnOrdinal,
suffixKeyCols []exec.NodeColumnOrdinal,
isIndexPut bool,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473,
"experimental opt-driven distsql planning: vector-mutation-search")
}

func (e *distSQLSpecExecFactory) ConstructCreateTable(
schema cat.Schema, ct *tree.CreateTable,
) (exec.Node, error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/execinfrapb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/rowenc",
"//pkg/sql/sem/catid", # keep
"//pkg/sql/sem/eval",
"//pkg/sql/sem/normalize",
"//pkg/sql/sem/tree",
Expand All @@ -48,6 +49,7 @@ go_library(
"//pkg/util/protoutil",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"//pkg/util/vector",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down
36 changes: 36 additions & 0 deletions pkg/sql/execinfrapb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/cockroach/pkg/util/vector"
"github.com/cockroachdb/errors"
"github.com/dustin/go-humanize"
)
Expand Down Expand Up @@ -95,6 +96,41 @@ func (v *ValuesCoreSpec) summary() (string, []string) {
return "Values", []string{detail}
}

// summary implements the diagramCellType interface.
func (v *VectorSearchSpec) summary() (string, []string) {
details := []string{
fmt.Sprintf("%s@%s", v.FetchSpec.TableName, v.FetchSpec.IndexName),
fmt.Sprintf("Nearest Neighbor Target Count: %d", v.TargetNeighborCount),
fmt.Sprintf("Query Vector: %s", vector.T(v.QueryVector).String()),
}
if len(v.PrefixKey) > 0 {
vals, _ := encoding.PrettyPrintValuesWithTypes(nil /* valDirs */, v.PrefixKey)
details = append(details, fmt.Sprintf("Prefix Vals: %s", strings.Join(vals, "/")))
}
return "VectorSearch", details
}

// summary implements the diagramCellType interface.
func (v *VectorMutationSearchSpec) summary() (string, []string) {
var mutationType string
if v.IsIndexPut {
mutationType = "Index Put"
} else {
mutationType = "Index Delete"
}
details := []string{
fmt.Sprintf("%s@%s", v.FetchSpec.TableName, v.FetchSpec.IndexName),
mutationType,
fmt.Sprintf("Query Vector Col: @%d", v.QueryVectorColumnOrdinal+1),
}
if len(v.PrefixKeyColumnOrdinals) > 0 {
details = append(details,
fmt.Sprintf("Prefix Columns: %s", colListStr(v.PrefixKeyColumnOrdinals)),
)
}
return "VectorMutationSearch", details
}

// summary implements the diagramCellType interface.
func (a *AggregatorSpec) summary() (string, []string) {
details := make([]string, 0, len(a.Aggregations)+1)
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/execinfrapb/processors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ message ProcessorCoreUnion {
optional IngestStoppedSpec ingestStopped = 44;
optional LogicalReplicationWriterSpec logicalReplicationWriter = 45;
optional LogicalReplicationOfflineScanSpec logicalReplicationOfflineScan = 46;
optional VectorSearchSpec vectorSearch = 47;
optional VectorMutationSearchSpec vectorMutationSearch = 48;

reserved 6, 12, 14, 17, 18, 19, 20, 32;
// NEXT ID: 47.
// NEXT ID: 49.
}

// NoopCoreSpec indicates a "no-op" processor core. This is used when we just
Expand Down
46 changes: 46 additions & 0 deletions pkg/sql/execinfrapb/processors_sql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1089,3 +1089,49 @@ message InsertSpec {
// Whether the insert should be autocommitted.
optional bool auto_commit = 4 [(gogoproto.nullable) = false];
}

// VectorSearchSpec is the specification for a vector-search operator, which
// traverses a CSPANN index tree to find candidate nearest neighbors for a query
// vector. It returns the primary keys of the candidate rows.
message VectorSearchSpec {
optional sqlbase.IndexFetchSpec fetch_spec = 1 [(gogoproto.nullable) = false];

// PrefixKey constrains the prefix index columns to a single value. It is
// empty for an index without prefix columns.
optional bytes prefix_key = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];

// QueryVector is the vector to search for.
repeated float query_vector = 3;

// TargetNeighborCount is the number of nearest neighbors to search for.
optional uint64 target_neighbor_count = 4 [(gogoproto.nullable) = false];
}

// VectorMutationSearchSpec is the specification for a vector-mutation-search
// operator, which traverses a CSPANN index tree to find the partitions where
// input query vectors belong, so that the vectors can be added to or removed
// from the index. It returns the set of input columns, plus the key of the
// discovered partition, and optionally the quantized vector.
message VectorMutationSearchSpec {
optional sqlbase.IndexFetchSpec fetch_spec = 1 [(gogoproto.nullable) = false];

// PrefixKeyColumnOrdinals and PrefixKeyColumns are set for an index with
// prefix columns. They identify the input columns that will be used to
// constrain each prefix column to a single value for the search.
repeated uint32 prefix_key_column_ordinals = 2;
repeated cockroach.sql.sqlbase.IndexFetchSpec.KeyColumn prefix_key_columns = 3 [(gogoproto.nullable) = false];

// QueryVectorColumnOrdinal identifies the vectors that will be inserted into
// or removed from the vector index.
optional uint32 query_vector_column_ordinal = 4 [(gogoproto.nullable) = false];

// SuffixKeyColumnOrdinals and SuffixKeyColumns are set for an index del.
// They identify a subset of the primary key columns to be used in locating
// the vector index entry for a specific row.
repeated uint32 suffix_key_column_ordinals = 5;
repeated cockroach.sql.sqlbase.IndexFetchSpec.KeyColumn suffix_key_columns = 6 [(gogoproto.nullable) = false];

// IsIndexPut is true if the search is being conducted for an index put,
// instead of an index del.
optional bool is_index_put = 7 [(gogoproto.nullable) = false];
}
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/vector_index
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ CREATE TABLE exec_test (
VECTOR INDEX (vec1)
)

statement error unimplemented: execution planning for vector index search is not yet implemented
statement error unimplemented: vector-mutation-search processor unimplemented
INSERT INTO exec_test (a, vec1) values (1, '[1, 2, 3]')

statement ok
Expand Down
Loading

0 comments on commit fa8342d

Please sign in to comment.