diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 43fb01179a6c..e04d4cee0e3a 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -603,6 +603,7 @@ go_library( "//pkg/util/uint128", "//pkg/util/unique", "//pkg/util/uuid", + "//pkg/util/vector", "@com_github_cockroachdb_apd_v3//:apd", "@com_github_cockroachdb_crlib//crtime", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 74a2d3df0e25..9b4291973d2e 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -54,7 +54,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/encoding" - "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/intsets" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -63,6 +62,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/cockroach/pkg/util/vector" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -4462,15 +4462,99 @@ func (dsp *DistSQLPlanner) addDistinctProcessors( func (dsp *DistSQLPlanner) createPlanForVectorSearch( planCtx *PlanningCtx, n *vectorSearchNode, ) (*PhysicalPlan, error) { - return nil, unimplemented.New("vector search", - "vector search is not yet supported by the DistSQLPlanner") + var queryVector vector.T + switch t := n.queryVector.(type) { + case *tree.DPGVector: + queryVector = t.T + default: + return nil, errors.AssertionFailedf("unexpected query vector type: %T", t) + } + + p := planCtx.NewPhysicalPlan() + colTypes := getTypesFromResultColumns(n.resultCols) + spec := &execinfrapb.VectorSearchSpec{ + PrefixKey: n.prefixKey, + QueryVector: queryVector, + TargetNeighborCount: n.targetNeighborCount, + } + fetchCols := make([]descpb.ColumnID, len(n.cols)) + for i, col := range n.cols { + fetchCols[i] = col.GetID() + } + if err := rowenc.InitIndexFetchSpec( + &spec.FetchSpec, + planCtx.ExtendedEvalCtx.Codec, + n.table, + n.index, + fetchCols, + ); err != nil { + return nil, err + } + + // Execute the vector search on the gateway node. + corePlacement := []physicalplan.ProcessorCorePlacement{{ + SQLInstanceID: dsp.gatewaySQLInstanceID, + Core: execinfrapb.ProcessorCoreUnion{VectorSearch: spec}, + }} + p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, colTypes, execinfrapb.Ordering{}) + p.PlanToStreamColMap = identityMap(make([]int, len(colTypes)), len(colTypes)) + return p, nil } func (dsp *DistSQLPlanner) createPlanForVectorMutationSearch( ctx context.Context, planCtx *PlanningCtx, n *vectorMutationSearchNode, ) (*PhysicalPlan, error) { - return nil, unimplemented.New("vector mutation search", - "vector mutation search is not yet supported by the DistSQLPlanner") + plan, err := dsp.createPhysPlanForPlanNode(ctx, planCtx, n.input) + if err != nil { + return nil, err + } + // Add the partition column. Also add the quantized vector column for index + // puts. + inputTypes := plan.GetResultTypes() + outputTypes := append(inputTypes, types.Int) + plan.PlanToStreamColMap = append(plan.PlanToStreamColMap, len(inputTypes)) + if n.isIndexPut { + outputTypes = append(outputTypes, types.Bytes) + plan.PlanToStreamColMap = append(plan.PlanToStreamColMap, len(inputTypes)+1) + } + // Retrieve the prefix and suffix index columns. + prefixKeyColumnOrdinals := make([]uint32, len(n.prefixKeyCols)) + for i, col := range n.prefixKeyCols { + prefixKeyColumnOrdinals[i] = uint32(plan.PlanToStreamColMap[col]) + } + suffixKeyColumnOrdinals := make([]uint32, len(n.suffixKeyCols)) + for i, col := range n.suffixKeyCols { + suffixKeyColumnOrdinals[i] = uint32(plan.PlanToStreamColMap[col]) + } + keyAndSuffixCols := n.table.IndexFetchSpecKeyAndSuffixColumns(n.index) + prefixKeyCols := keyAndSuffixCols[n.index.NumKeyColumns()-1:] + suffixKeyCols := keyAndSuffixCols[n.index.NumKeyColumns():] + spec := &execinfrapb.VectorMutationSearchSpec{ + PrefixKeyColumnOrdinals: prefixKeyColumnOrdinals, + PrefixKeyColumns: prefixKeyCols, + QueryVectorColumnOrdinal: uint32(plan.PlanToStreamColMap[n.queryVectorCol]), + SuffixKeyColumnOrdinals: suffixKeyColumnOrdinals, + SuffixKeyColumns: suffixKeyCols, + IsIndexPut: n.isIndexPut, + } + // VectorMutationSearch operators materialize partition and quantized-vec + // columns rather than fetching from the table, so leave fetchCols empty. + var fetchCols []descpb.ColumnID + if err := rowenc.InitIndexFetchSpec( + &spec.FetchSpec, + planCtx.ExtendedEvalCtx.Codec, + n.table, + n.index, + fetchCols, + ); err != nil { + return nil, err + } + + // The vector mutation search can be conducted for each row independently, so + // it's fine to instantiate one instance for each stream. + pSpec := execinfrapb.ProcessorCoreUnion{VectorMutationSearch: spec} + plan.AddNoGroupingStage(pSpec, execinfrapb.PostProcessSpec{}, outputTypes, execinfrapb.Ordering{}) + return plan, nil } func (dsp *DistSQLPlanner) createPlanForOrdinality( diff --git a/pkg/sql/execinfrapb/BUILD.bazel b/pkg/sql/execinfrapb/BUILD.bazel index cee0259287e4..ea41afa093d1 100644 --- a/pkg/sql/execinfrapb/BUILD.bazel +++ b/pkg/sql/execinfrapb/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/rowenc", + "//pkg/sql/sem/catid", # keep "//pkg/sql/sem/eval", "//pkg/sql/sem/normalize", "//pkg/sql/sem/tree", @@ -48,6 +49,7 @@ go_library( "//pkg/util/protoutil", "//pkg/util/tracing/tracingpb", "//pkg/util/uuid", + "//pkg/util/vector", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index 4cf0449f6619..60b55e887268 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/cockroach/pkg/util/vector" "github.com/cockroachdb/errors" "github.com/dustin/go-humanize" ) @@ -95,6 +96,41 @@ func (v *ValuesCoreSpec) summary() (string, []string) { return "Values", []string{detail} } +// summary implements the diagramCellType interface. +func (v *VectorSearchSpec) summary() (string, []string) { + details := []string{ + fmt.Sprintf("%s@%s", v.FetchSpec.TableName, v.FetchSpec.IndexName), + fmt.Sprintf("Nearest Neighbor Target Count: %d", v.TargetNeighborCount), + fmt.Sprintf("Query Vector: %s", vector.T(v.QueryVector).String()), + } + if len(v.PrefixKey) > 0 { + vals, _ := encoding.PrettyPrintValuesWithTypes(nil /* valDirs */, v.PrefixKey) + details = append(details, fmt.Sprintf("Prefix Vals: %s", strings.Join(vals, "/"))) + } + return "VectorSearch", details +} + +// summary implements the diagramCellType interface. +func (v *VectorMutationSearchSpec) summary() (string, []string) { + var mutationType string + if v.IsIndexPut { + mutationType = "Index Put" + } else { + mutationType = "Index Delete" + } + details := []string{ + fmt.Sprintf("%s@%s", v.FetchSpec.TableName, v.FetchSpec.IndexName), + mutationType, + fmt.Sprintf("Query Vector Col: @%d", v.QueryVectorColumnOrdinal+1), + } + if len(v.PrefixKeyColumnOrdinals) > 0 { + details = append(details, + fmt.Sprintf("Prefix Columns: %s", colListStr(v.PrefixKeyColumnOrdinals)), + ) + } + return "VectorMutationSearch", details +} + // summary implements the diagramCellType interface. func (a *AggregatorSpec) summary() (string, []string) { details := make([]string, 0, len(a.Aggregations)+1) diff --git a/pkg/sql/execinfrapb/processors.proto b/pkg/sql/execinfrapb/processors.proto index 69165db28f9e..d84e49023437 100644 --- a/pkg/sql/execinfrapb/processors.proto +++ b/pkg/sql/execinfrapb/processors.proto @@ -123,9 +123,11 @@ message ProcessorCoreUnion { optional IngestStoppedSpec ingestStopped = 44; optional LogicalReplicationWriterSpec logicalReplicationWriter = 45; optional LogicalReplicationOfflineScanSpec logicalReplicationOfflineScan = 46; + optional VectorSearchSpec vectorSearch = 47; + optional VectorMutationSearchSpec vectorMutationSearch = 48; reserved 6, 12, 14, 17, 18, 19, 20, 32; - // NEXT ID: 47. + // NEXT ID: 49. } // NoopCoreSpec indicates a "no-op" processor core. This is used when we just diff --git a/pkg/sql/execinfrapb/processors_sql.proto b/pkg/sql/execinfrapb/processors_sql.proto index 3aa879449a37..cb877a82fea6 100644 --- a/pkg/sql/execinfrapb/processors_sql.proto +++ b/pkg/sql/execinfrapb/processors_sql.proto @@ -1089,3 +1089,49 @@ message InsertSpec { // Whether the insert should be autocommitted. optional bool auto_commit = 4 [(gogoproto.nullable) = false]; } + +// VectorSearchSpec is the specification for a vector-search operator, which +// traverses a CSPANN index tree to find candidate nearest neighbors for a query +// vector. It returns the primary keys of the candidate rows. +message VectorSearchSpec { + optional sqlbase.IndexFetchSpec fetch_spec = 1 [(gogoproto.nullable) = false]; + + // PrefixKey constrains the prefix index columns to a single value. It is + // empty for an index without prefix columns. + optional bytes prefix_key = 2 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"]; + + // QueryVector is the vector to search for. + repeated float query_vector = 3; + + // TargetNeighborCount is the number of nearest neighbors to search for. + optional uint64 target_neighbor_count = 4 [(gogoproto.nullable) = false]; +} + +// VectorMutationSearchSpec is the specification for a vector-mutation-search +// operator, which traverses a CSPANN index tree to find the partitions where +// input query vectors belong, so that the vectors can be added to or removed +// from the index. It returns the set of input columns, plus the key of the +// discovered partition, and optionally the quantized vector. +message VectorMutationSearchSpec { + optional sqlbase.IndexFetchSpec fetch_spec = 1 [(gogoproto.nullable) = false]; + + // PrefixKeyColumnOrdinals and PrefixKeyColumns are set for an index with + // prefix columns. They identify the input columns that will be used to + // constrain each prefix column to a single value for the search. + repeated uint32 prefix_key_column_ordinals = 2; + repeated cockroach.sql.sqlbase.IndexFetchSpec.KeyColumn prefix_key_columns = 3 [(gogoproto.nullable) = false]; + + // QueryVectorColumnOrdinal identifies the vectors that will be inserted into + // or removed from the vector index. + optional uint32 query_vector_column_ordinal = 4 [(gogoproto.nullable) = false]; + + // SuffixKeyColumnOrdinals and SuffixKeyColumns are set for an index del. + // They identify a subset of the primary key columns to be used in locating + // the vector index entry for a specific row. + repeated uint32 suffix_key_column_ordinals = 5; + repeated cockroach.sql.sqlbase.IndexFetchSpec.KeyColumn suffix_key_columns = 6 [(gogoproto.nullable) = false]; + + // IsIndexPut is true if the search is being conducted for an index put, + // instead of an index del. + optional bool is_index_put = 7 [(gogoproto.nullable) = false]; +} diff --git a/pkg/sql/logictest/testdata/logic_test/vector_index b/pkg/sql/logictest/testdata/logic_test/vector_index index 2af1acc35d5e..bacac3f1e5a1 100644 --- a/pkg/sql/logictest/testdata/logic_test/vector_index +++ b/pkg/sql/logictest/testdata/logic_test/vector_index @@ -269,7 +269,7 @@ CREATE TABLE exec_test ( VECTOR INDEX (vec1) ) -statement error unimplemented: execution planning for vector index search is not yet implemented +statement error unimplemented: vector-mutation-search processor unimplemented INSERT INTO exec_test (a, vec1) values (1, '[1, 2, 3]') statement ok diff --git a/pkg/sql/rowexec/BUILD.bazel b/pkg/sql/rowexec/BUILD.bazel index b61720d6d94b..de8160210f53 100644 --- a/pkg/sql/rowexec/BUILD.bazel +++ b/pkg/sql/rowexec/BUILD.bazel @@ -96,6 +96,7 @@ go_library( "//pkg/util/collatedstring", "//pkg/util/ctxgroup", "//pkg/util/encoding", + "//pkg/util/errorutil/unimplemented", "//pkg/util/hlc", "//pkg/util/intsets", "//pkg/util/log", diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index 9abbd7cd9864..b410ebd59434 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -301,6 +302,20 @@ func NewProcessor( } return newWindower(ctx, flowCtx, processorID, core.Windower, inputs[0], post) } + if core.VectorSearch != nil { + if err := checkNumIn(inputs, 0); err != nil { + return nil, err + } + return nil, unimplemented.New("vector-search", + "vector-search processor unimplemented") + } + if core.VectorMutationSearch != nil { + if err := checkNumIn(inputs, 1); err != nil { + return nil, err + } + return nil, unimplemented.New("vector-mutation-search", + "vector-mutation-search processor unimplemented") + } if core.LocalPlanNode != nil { numInputs := int(core.LocalPlanNode.NumInputs) if err := checkNumIn(inputs, numInputs); err != nil {