Skip to content

Commit

Permalink
[fix][dingo-executor] Support selection for coprocessorV2 (#1345)
Browse files Browse the repository at this point in the history
  • Loading branch information
githubgxll authored Feb 25, 2025
1 parent 64bee41 commit 03c8f7f
Show file tree
Hide file tree
Showing 16 changed files with 999 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ private DingoIndexFullScanVisitFun() {
relOp,
rel.isPushDown(),
rel.getSelection(),
0
0,
transaction.isAutoCommit()
));
}
assert indexScanvertex != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ private DingoIndexRangeScanVisitFun() {
relOp,
rel.isPushDown(),
rel.getSelection(),
0
0,
transaction.isAutoCommit()
));
}
OutputHint hint = new OutputHint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ private DingoIndexScanWithRelOpVisitFun() {
rel.isPushDown(),
td.version,
0,
td.getCodecVersion()
td.getCodecVersion(),
null
);
if (relOp instanceof PipeOp) {
return new Vertex(SCAN_WITH_PIPE_OP, param);
Expand Down Expand Up @@ -231,7 +232,9 @@ private DingoIndexScanWithRelOpVisitFun() {
rel.isPushDown(),
td.version,
0,
td.getCodecVersion()
td.getCodecVersion(),
null,
transaction.isAutoCommit()
);
if (relOp instanceof PipeOp) {
return new Vertex(TXN_SCAN_WITH_PIPE_OP, param);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ private DingoScanWithRelOpVisitFun() {
rel.isPushDown(),
td.version,
rel.getLimit(),
td.getCodecVersion()
td.getCodecVersion(),
null
);
if (relOp instanceof PipeOp) {
return new Vertex(SCAN_WITH_PIPE_OP, param);
Expand Down Expand Up @@ -256,7 +257,9 @@ private DingoScanWithRelOpVisitFun() {
rel.isPushDown(),
td.version,
rel.getLimit(),
td.getCodecVersion()
td.getCodecVersion(),
null,
transaction.isAutoCommit()
);
if (relOp instanceof PipeOp) {
return new Vertex(TXN_SCAN_WITH_PIPE_OP, param);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.dingodb.expr.runtime.expr.NullaryAggExpr;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;

import java.io.ByteArrayOutputStream;
Expand All @@ -52,6 +53,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Slf4j
@JsonTypeName("scanRel")
@JsonPropertyOrder({
"tableId",
Expand Down Expand Up @@ -79,6 +81,9 @@ public class ScanWithRelOpParam extends ScanParam {
@Setter
protected int limit;

@JsonProperty("selection")
protected List<Integer> selection;

@Getter
protected transient CoprocessorV2 coprocessor;

Expand Down Expand Up @@ -126,7 +131,8 @@ public ScanWithRelOpParam(
boolean pushDown,
int schemaVersion,
int limit,
int codecVersion
int codecVersion,
List<Integer> selection
) {
super(tableId, schema, keyMapping, schemaVersion, codecVersion);
this.relOp = relOp;
Expand All @@ -135,6 +141,7 @@ public ScanWithRelOpParam(
coprocessor = null;
this.limit = limit;
config = new DingoRelConfig();
this.selection = selection;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,34 @@
import io.dingodb.codec.KeyValueCodec;
import io.dingodb.common.CommonId;
import io.dingodb.common.CoprocessorV2;
import io.dingodb.common.log.LogUtils;
import io.dingodb.common.type.DingoType;
import io.dingodb.common.type.TupleMapping;
import io.dingodb.exec.dag.Vertex;
import io.dingodb.exec.expr.DingoCompileContext;
import io.dingodb.exec.utils.SchemaWrapperUtils;
import io.dingodb.exec.utils.relop.RelOpMappingVisitor;
import io.dingodb.exec.utils.relop.RelOpSelectionVisitor;
import io.dingodb.exec.utils.relop.SelectionFlag;
import io.dingodb.exec.utils.relop.SelectionObj;
import io.dingodb.expr.coding.CodingFlag;
import io.dingodb.expr.coding.RelOpCoder;
import io.dingodb.expr.common.type.TupleType;
import io.dingodb.expr.rel.RelOp;
import io.dingodb.meta.entity.Table;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Slf4j
@Getter
public class TxnIndexRangeScanParam extends ScanWithRelOpParam {

Expand Down Expand Up @@ -68,6 +78,8 @@ public class TxnIndexRangeScanParam extends ScanWithRelOpParam {
protected List<Integer> mapList;
@JsonProperty("selection")
private TupleMapping selection;
@JsonProperty("isAutoCommit")
private final boolean isAutoCommit;

public TxnIndexRangeScanParam(CommonId indexTableId,
CommonId tableId,
Expand All @@ -82,9 +94,10 @@ public TxnIndexRangeScanParam(CommonId indexTableId,
RelOp relOp,
boolean pushDown,
TupleMapping selection,
int limit) {
int limit,
boolean isAutoCommit) {
super(tableId, index.tupleType(), keyMapping, relOp, outputSchema,
pushDown, index.getVersion(), limit, table.getCodecVersion());
pushDown, index.getVersion(), limit, table.getCodecVersion(), selection.stream().boxed().collect(Collectors.toList()));
this.indexSchema = index.tupleType();
this.indexTableId = indexTableId;
this.isLookup = isLookup;
Expand All @@ -94,6 +107,7 @@ public TxnIndexRangeScanParam(CommonId indexTableId,
this.scanTs = scanTs;
this.timeout = timeout;
this.selection = selection;
this.isAutoCommit = isAutoCommit;
this.codec = CodecService.getDefault().createKeyValueCodec(
index.getCodecVersion(), index.version, index.tupleType(), index.keyMapping());
if (isLookup) {
Expand All @@ -109,16 +123,48 @@ public void init(Vertex vertex) {
if (relOp == null) {
return;
}
relOp = relOp.compile(new DingoCompileContext(
RelOp relOpCompile = relOp.compile(new DingoCompileContext(
(TupleType) indexSchema.getType(),
(TupleType) vertex.getParasType().getType()
), config);
if (pushDown) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
if (RelOpCoder.INSTANCE.visit(relOp, os) == CodingFlag.OK) {
if (RelOpCoder.INSTANCE.visit(relOpCompile, os) == CodingFlag.OK) {
List<Integer> selection = IntStream.range(0, indexSchema.fieldCount())
.boxed()
.collect(Collectors.toList());
Set<Integer> selections = new HashSet<>();
SelectionObj selectionObj = new SelectionObj(selections, true);
boolean isSelection = false;
if (isAutoCommit() && RelOpSelectionVisitor.INSTANCE.visit(relOp, selectionObj) == SelectionFlag.OK
&& selectionObj.isProject() && selections.size() != selection.size()) {
try {
selection.clear();
selection.addAll(selections);
selection.sort(Comparator.naturalOrder());
relOpCompile = RelOpMappingVisitor.INSTANCE.visit(relOp, selection);
LogUtils.debug(log, "jobId:{}, new relOp: {}", vertex.getTask().getJobId(), relOpCompile);
isSelection = true;
} catch (Exception e) {
LogUtils.error(log, e.getMessage(), e);
selection = IntStream.range(0, indexSchema.fieldCount())
.boxed()
.collect(Collectors.toList());
}
} else {
LogUtils.debug(log, "jobId:{}, origin relOp: {}", vertex.getTask().getJobId(), relOp);
}
if (isSelection) {
relOpCompile = relOpCompile.compile(new DingoCompileContext(
(TupleType) indexSchema.select(TupleMapping.of(selection)).getType(),
(TupleType) vertex.getParasType().getType()
), config);
os = new ByteArrayOutputStream();
if (RelOpCoder.INSTANCE.visit(relOpCompile, os) != CodingFlag.OK) {
relOp = relOpCompile;
return;
}
}
TupleMapping keyMapping = indexKeyMapping();
TupleMapping outputKeyMapping = TupleMapping.of(new int[]{});
coprocessor = CoprocessorV2.builder()
Expand All @@ -130,6 +176,7 @@ public void init(Vertex vertex) {
.build();
}
}
relOp = relOpCompile;
}

public TupleMapping indexKeyMapping() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,34 @@
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.dingodb.common.CommonId;
import io.dingodb.common.CoprocessorV2;
import io.dingodb.common.log.LogUtils;
import io.dingodb.common.type.DingoType;
import io.dingodb.common.type.TupleMapping;
import io.dingodb.exec.dag.Vertex;
import io.dingodb.exec.expr.DingoCompileContext;
import io.dingodb.exec.utils.SchemaWrapperUtils;
import io.dingodb.exec.utils.relop.RelOpMappingVisitor;
import io.dingodb.exec.utils.relop.RelOpSelectionVisitor;
import io.dingodb.exec.utils.relop.SelectionFlag;
import io.dingodb.exec.utils.relop.SelectionObj;
import io.dingodb.expr.coding.CodingFlag;
import io.dingodb.expr.coding.RelOpCoder;
import io.dingodb.expr.common.type.TupleType;
import io.dingodb.expr.rel.RelOp;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;

import java.io.ByteArrayOutputStream;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Slf4j
@Getter
@JsonTypeName("txnScanRel")
@JsonPropertyOrder({
Expand All @@ -46,6 +68,7 @@ public class TxnScanWithRelOpParam extends ScanWithRelOpParam {
private final long timeOut;
@JsonProperty("scanTs")
private long scanTs;
private final boolean isAutoCommit;

public TxnScanWithRelOpParam(
CommonId tableId,
Expand All @@ -59,17 +82,83 @@ public TxnScanWithRelOpParam(
boolean pushDown,
int schemaVersion,
int limit,
int codecVersion
int codecVersion,
List<Integer> selection,
boolean isAutoCommit
) {
super(tableId, schema, keyMapping, relOp, outputSchema, pushDown, schemaVersion, limit, codecVersion);
super(tableId, schema, keyMapping, relOp, outputSchema, pushDown, schemaVersion, limit, codecVersion, selection);
this.scanTs = scanTs;
this.isolationLevel = isolationLevel;
this.timeOut = timeOut;
this.isAutoCommit = isAutoCommit;
}

@Override
public void setStartTs(long startTs) {
super.setStartTs(startTs);
this.scanTs = startTs;
}

@Override
public void init(Vertex vertex) {
if (selection == null) {
selection = IntStream.range(0, schema.fieldCount())
.boxed()
.collect(Collectors.toList());
}
RelOp relOpCompile = relOp.compile(new DingoCompileContext(
(TupleType) schema.getType(),
(TupleType) vertex.getParasType().getType()
), config);
if (pushDown) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
if (RelOpCoder.INSTANCE.visit(relOpCompile, os) == CodingFlag.OK) {
Set<Integer> selections = new HashSet<>();
SelectionObj selectionObj = new SelectionObj(selections, true);
boolean isSelection = false;
if (isAutoCommit() && RelOpSelectionVisitor.INSTANCE.visit(relOp, selectionObj) == SelectionFlag.OK
&& selectionObj.isProject() && selections.size() != selection.size()) {
try {
selection.clear();
selection.addAll(selections);
selection.sort(Comparator.naturalOrder());
relOpCompile = RelOpMappingVisitor.INSTANCE.visit(relOp, selection);
LogUtils.debug(log, "jobId:{}, new relOp: {}", vertex.getTask().getJobId(), relOpCompile);
isSelection = true;
} catch (Exception e) {
LogUtils.error(log, e.getMessage(), e);
selection = IntStream.range(0, schema.fieldCount())
.boxed()
.collect(Collectors.toList());
}
} else {
LogUtils.debug(log, "jobId:{}, origin relOp: {}", vertex.getTask().getJobId(), relOp);
}
if (isSelection) {
relOpCompile = relOpCompile.compile(new DingoCompileContext(
(TupleType) schema.select(TupleMapping.of(selection)).getType(),
(TupleType) vertex.getParasType().getType()
), config);
os = new ByteArrayOutputStream();
if (RelOpCoder.INSTANCE.visit(relOpCompile, os) != CodingFlag.OK) {
relOp = relOpCompile;
return;
}
}
TupleMapping outputKeyMapping = TupleMapping.of(new int[]{});
coprocessor = CoprocessorV2.builder()
.originalSchema(SchemaWrapperUtils.buildSchemaWrapper(schema, keyMapping, tableId.seq))
.resultSchema(SchemaWrapperUtils.buildSchemaWrapper(outputSchema, outputKeyMapping, tableId.seq))
.selection(selection)
.relExpr(os.toByteArray())
.codecVersion(codecVersion)
.build();
if (limit > 0) {
coprocessor.setLimit(limit);
}
}
}
relOp = relOpCompile;
}

}
Loading

0 comments on commit 03c8f7f

Please sign in to comment.