Skip to content

Commit

Permalink
[feat][dingo-executor] Add transaction parameter settings
Browse files Browse the repository at this point in the history
  • Loading branch information
githubgxll authored and JYcz committed Jan 8, 2024
1 parent 629b7a2 commit 9d20eae
Show file tree
Hide file tree
Showing 34 changed files with 242 additions and 89 deletions.
1 change: 1 addition & 0 deletions dingo-calcite/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ data: {
"SAMPLES"
"SAMPLERATE"
"PESSIMISTIC"
"OPTIMISTIC"
"BLOCKS"
"HASH"
"LOCKS"
Expand Down
16 changes: 8 additions & 8 deletions dingo-calcite/src/main/codegen/includes/Transaction.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@

SqlBeginTx SqlStartTx(): {
final Span s;
boolean pessimistic = false;
String txnMode = "";
} {
<START> { s = span(); } <TRANSACTION> [<PESSIMISTIC> { pessimistic = true; }]{ return new SqlBeginTx(s.end(this), pessimistic); }
<START> { s = span(); } <TRANSACTION> [<PESSIMISTIC> { txnMode = "PESSIMISTIC"; } | <OPTIMISTIC> { txnMode = "OPTIMISTIC"; }]{ return new SqlBeginTx(s.end(this), txnMode); }
}

SqlBeginTx SqlBegin(): {
final Span s; boolean pessimistic = false;
final Span s; String txnMode = "";
} {
<BEGIN> [<PESSIMISTIC> { pessimistic = true; }] { s = span(); return new SqlBeginTx(s.end(this), pessimistic); }
<BEGIN> [<PESSIMISTIC> { txnMode = "PESSIMISTIC"; } | <OPTIMISTIC> { txnMode = "OPTIMISTIC"; }] { s = span(); return new SqlBeginTx(s.end(this), txnMode); }
}

SqlLock SqlLock(): {
Expand All @@ -41,8 +41,8 @@ SqlLock SqlLock(): {
<IDENTIFIER> { tableNameList.add(token.image); }
)*
{ return new SqlLockTable(s.end(this), tableNameList); }
|
<BLOCKS>
|
<BLOCKS>
{
sqlBlockList.add(readBlock());
}
Expand Down Expand Up @@ -94,8 +94,8 @@ SqlUnLock SqlUnLock(): {
<IDENTIFIER> { tableNameList.add(token.image); }
)*
{ return new SqlUnLockTable(s.end(this), tableNameList); }
|
<BLOCKS>
|
<BLOCKS>
{
sqlBlockList.add(readBlock());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.List;

public class SqlBeginTx extends SqlDdl {
public boolean pessimistic;
public String txnMode = "";

private static final SqlOperator OPERATOR =
new SqlSpecialOperator("START TRANSACTION", SqlKind.OTHER_DDL);

Expand All @@ -36,9 +37,9 @@ public class SqlBeginTx extends SqlDdl {
*
* @param pos
*/
public SqlBeginTx(SqlParserPos pos, boolean pessimistic) {
public SqlBeginTx(SqlParserPos pos, String txnMode) {
super(OPERATOR, pos);
this.pessimistic = pessimistic;
this.txnMode = txnMode;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.dingodb.calcite.grammar.dql.SqlShowTables;
import io.dingodb.calcite.grammar.dql.SqlShowVariables;
import io.dingodb.calcite.grammar.dql.SqlShowWarnings;
import io.dingodb.exec.transaction.base.TransactionType;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
Expand All @@ -53,6 +54,7 @@
import org.apache.commons.lang3.StringUtils;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Optional;

public final class SqlToOperationConverter {
Expand Down Expand Up @@ -159,7 +161,18 @@ public static Optional<Operation> convert(SqlNode sqlNode, Connection connection
return Optional.of(new RollbackTxOperation(connection));
} else if (sqlNode instanceof SqlBeginTx) {
SqlBeginTx sqlBeginTx = (SqlBeginTx) sqlNode;
return Optional.of(new StartTransactionOperation(connection, sqlBeginTx.pessimistic));
boolean pessimistic = false;
try {
if (sqlBeginTx.txnMode.equalsIgnoreCase(TransactionType.PESSIMISTIC.name())) {
pessimistic = true;
} else if (sqlBeginTx.txnMode.equals("") &&
connection.getClientInfo("txn_mode").equalsIgnoreCase(TransactionType.PESSIMISTIC.name())) {
pessimistic = true;
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return Optional.of(new StartTransactionOperation(connection, pessimistic));
} else if (sqlNode instanceof SqlLockTable) {
SqlLockTable sqlLockTable = (SqlLockTable) sqlNode;
return Optional.of(new LockTableOperation(connection, sqlLockTable.tableList));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static void renderJob(Job job, RelNode input, Location currentLocation, b

@Override
public Collection<Vertex> visit(@NonNull DingoStreamingConverter rel) {
return DingoStreamingConverterVisitFun.visit(job, idGenerator, currentLocation, this, rel);
return DingoStreamingConverterVisitFun.visit(job, idGenerator, currentLocation, transaction, this, rel);
}

@Override
Expand Down Expand Up @@ -176,7 +176,7 @@ public Collection<Vertex> visit(@NonNull DingoUnion rel) {

@Override
public Collection<Vertex> visit(@NonNull DingoValues rel) {
return DingoValuesVisitFun.visit(job, idGenerator, currentLocation, this, rel);
return DingoValuesVisitFun.visit(job, idGenerator, currentLocation, transaction, this, rel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.dingodb.exec.dag.Vertex;
import io.dingodb.exec.operator.params.ReceiveParam;
import io.dingodb.exec.operator.params.SendParam;
import io.dingodb.exec.transaction.base.ITransaction;
import io.dingodb.store.api.transaction.data.IsolationLevel;
import org.checkerframework.checker.nullness.qual.NonNull;

import static io.dingodb.exec.utils.OperatorCodeUtils.RECEIVE;
Expand All @@ -36,14 +38,19 @@ private DingoExchangeFun() {
}

public static Vertex exchange(
Job job, IdGenerator idGenerator, @NonNull Vertex input, @NonNull Location target, DingoType schema
Job job, IdGenerator idGenerator, ITransaction transaction, @NonNull Vertex input, @NonNull Location target, DingoType schema
) {
Task task = input.getTask();
if (target.equals(task.getLocation())) {
return input;
}
CommonId id = idGenerator.getOperatorId(task.getId());
Task rcvTask = job.getOrCreate(target, idGenerator);
Task rcvTask;
if (transaction != null) {
rcvTask = job.getOrCreate(target, idGenerator, transaction.getType(), IsolationLevel.of(transaction.getIsolationLevel()));
} else {
rcvTask = job.getOrCreate(target, idGenerator);
}
CommonId receiveId = idGenerator.getOperatorId(rcvTask.getId());
SendParam sendParam = new SendParam(target.getHost(), target.getPort(), receiveId, schema);
Vertex send = new Vertex(SEND, sendParam);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.dingodb.exec.transaction.base.ITransaction;
import io.dingodb.partition.DingoPartitionServiceProvider;
import io.dingodb.partition.PartitionService;
import io.dingodb.store.api.transaction.data.IsolationLevel;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -70,9 +71,11 @@ public static Collection<Vertex> visit(

for (RangeDistribution distribution
: ps.calcPartitionRange(rel.getPrefix(), rel.getPrefix(), true, true, distributions)) {
Task task = job.getOrCreate(currentLocation, idGenerator);
Task task;
if (transaction != null) {

task = job.getOrCreate(currentLocation, idGenerator, transaction.getType(), IsolationLevel.of(transaction.getIsolationLevel()));
} else {
task = job.getOrCreate(currentLocation, idGenerator);
}
LikeScanParam param = new LikeScanParam(
tableInfo.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.dingodb.exec.transaction.base.ITransaction;
import io.dingodb.partition.DingoPartitionServiceProvider;
import io.dingodb.partition.PartitionService;
import io.dingodb.store.api.transaction.data.IsolationLevel;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -103,7 +104,12 @@ public static Collection<Vertex> visit(
rd.isWithEnd());
vertex = new Vertex(PART_RANGE_DELETE, param);
}
Task task = job.getOrCreate(currentLocation, idGenerator);
Task task;
if (transaction != null) {
task = job.getOrCreate(currentLocation, idGenerator, transaction.getType(), IsolationLevel.of(transaction.getIsolationLevel()));
} else {
task = job.getOrCreate(currentLocation, idGenerator);
}
vertex.setId(idGenerator.getOperatorId(task.getId()));
task.putVertex(vertex);
OutputHint outputHint = new OutputHint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.dingodb.exec.operator.hash.SimpleHashStrategy;
import io.dingodb.exec.operator.params.HashParam;
import io.dingodb.exec.operator.params.PartitionParam;
import io.dingodb.exec.transaction.base.ITransaction;
import io.dingodb.meta.MetaService;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.NonNull;
Expand All @@ -64,10 +65,11 @@
public class DingoStreamingConverterVisitFun {
@NonNull
public static Collection<Vertex> visit(
Job job, IdGenerator idGenerator, Location currentLocation, DingoJobVisitor visitor, DingoStreamingConverter rel
Job job, IdGenerator idGenerator, Location currentLocation, ITransaction transaction, DingoJobVisitor visitor, DingoStreamingConverter rel
) {
return convertStreaming(
job, idGenerator, currentLocation,
transaction,
dingo(rel.getInput()).accept(visitor),
dingo(rel.getInput()).getStreaming(),
rel.getStreaming(),
Expand All @@ -77,6 +79,7 @@ public static Collection<Vertex> visit(

public static @NonNull Collection<Vertex> convertStreaming(
Job job, IdGenerator idGenerator, Location currentLocation,
ITransaction transaction,
@NonNull Collection<Vertex> inputs,
@NonNull DingoRelStreaming srcStreaming,
@NonNull DingoRelStreaming dstStreaming,
Expand Down Expand Up @@ -106,7 +109,7 @@ public static Collection<Vertex> visit(
if (!Objects.equals(dstDistribution, srcDistribution)) {
outputs = outputs.stream().map(input -> {
Location targetLocation = (dstDistribution == null ? currentLocation : input.getTargetLocation());
return DingoExchangeFun.exchange(job, idGenerator, input, targetLocation, schema);
return DingoExchangeFun.exchange(job, idGenerator, transaction, input, targetLocation, schema);
}).collect(Collectors.toList());
}
if (dstPartitions.size() < media.getPartitions().size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.dingodb.exec.transaction.base.ITransaction;
import io.dingodb.partition.DingoPartitionServiceProvider;
import io.dingodb.partition.PartitionService;
import io.dingodb.store.api.transaction.data.IsolationLevel;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.sql.SqlKind;
import org.checkerframework.checker.nullness.qual.NonNull;
Expand Down Expand Up @@ -106,9 +107,10 @@ private DingoTableScanVisitFun() {

// TODO
for (RangeDistribution rd : distributions) {
Task task = job.getOrCreate(currentLocation, idGenerator);
Task task;
Vertex vertex;
if (transaction != null) {
task = job.getOrCreate(currentLocation, idGenerator, transaction.getType(), IsolationLevel.of(transaction.getIsolationLevel()));
TxnPartRangeScanParam param = new TxnPartRangeScanParam(
tableInfo.getId(),
rd.id(),
Expand All @@ -131,6 +133,7 @@ private DingoTableScanVisitFun() {
);
vertex = new Vertex(TXN_PART_RANGE_SCAN, param);
} else {
task = job.getOrCreate(currentLocation, idGenerator);
PartRangeScanParam param = new PartRangeScanParam(
tableInfo.getId(),
rd.id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@
import io.dingodb.exec.base.Task;
import io.dingodb.exec.dag.Vertex;
import io.dingodb.exec.operator.params.ValuesParam;
import io.dingodb.exec.transaction.base.ITransaction;
import io.dingodb.exec.transaction.base.TransactionType;
import io.dingodb.partition.DingoPartitionServiceProvider;
import io.dingodb.partition.PartitionService;
import io.dingodb.store.api.transaction.data.IsolationLevel;
import org.checkerframework.checker.nullness.qual.NonNull;

import java.util.LinkedList;
Expand All @@ -58,11 +61,16 @@ private DingoValuesVisitFun() {
}

public static List<Vertex> visit(
Job job, IdGenerator idGenerator, Location currentLocation, DingoJobVisitor visitor, @NonNull DingoValues rel
Job job, IdGenerator idGenerator, Location currentLocation, ITransaction transaction, DingoJobVisitor visitor, @NonNull DingoValues rel
) {
DingoRelStreaming streaming = rel.getStreaming();
if (streaming.equals(DingoRelStreaming.ROOT)) {
Task task = job.getOrCreate(currentLocation, idGenerator);
Task task;
if (transaction != null) {
task = job.getOrCreate(currentLocation, idGenerator, transaction.getType(), IsolationLevel.of(transaction.getIsolationLevel()));
} else {
task = job.getOrCreate(currentLocation, idGenerator);
}
ValuesParam param = new ValuesParam(rel.getTuples(),
Objects.requireNonNull(DefinitionMapper.mapToDingoType(rel.getRowType()))
);
Expand Down Expand Up @@ -93,7 +101,12 @@ public static List<Vertex> visit(
Objects.requireNonNull(DefinitionMapper.mapToDingoType(rel.getRowType()))
);
Vertex vertex = new Vertex(VALUES, param);
Task task = job.getOrCreate(currentLocation, idGenerator);
Task task;
if (transaction != null) {
task = job.getOrCreate(currentLocation, idGenerator, transaction.getType(), IsolationLevel.of(transaction.getIsolationLevel()));
} else {
task = job.getOrCreate(currentLocation, idGenerator);
}
vertex.setId(idGenerator.getOperatorId(task.getId()));
OutputHint hint = new OutputHint();
hint.setPartId(entry.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.dingodb.common.mysql.client.SessionVariableWatched;
import io.dingodb.exec.transaction.base.ITransaction;
import io.dingodb.exec.transaction.impl.TransactionManager;
import io.dingodb.exec.transaction.util.TransactionUtil;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -88,6 +89,16 @@ public DingoMeta getMeta() {
return (DingoMeta) meta;
}

public ITransaction createTransaction(boolean pessimistic) {
if (transaction == null) {
long startTs = TransactionManager.getStart_ts();
this.transaction = TransactionManager.createTransaction(pessimistic, startTs,
TransactionUtil.convertIsolationLevel(getClientInfo("transaction_isolation")));
transaction.setTransactionConfig(sessionVariables);
}
return transaction;
}

public void cleanTransaction() throws SQLException {
if(transaction != null) {
if (!transaction.isAutoCommit()) {
Expand All @@ -113,8 +124,7 @@ public void beginTransaction(boolean pessimistic) throws SQLException{
this.transaction = null;
}
}
long startTs = TransactionManager.getStart_ts();
this.transaction = TransactionManager.createTransaction(pessimistic, startTs);
createTransaction(pessimistic);
this.autoCommit = false;
}

Expand Down Expand Up @@ -142,8 +152,7 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
}
}
if(autoCommit == false) {
long startTs = TransactionManager.getStart_ts();
this.transaction = TransactionManager.createTransaction(false, startTs);
createTransaction(false);
this.autoCommit = false;
}

Expand Down Expand Up @@ -252,7 +261,23 @@ protected Meta.ExecuteResult prepareAndExecuteInternal(

@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException {
if (name.equalsIgnoreCase("transaction_isolation")) {
if (transaction != null) {
throw new RuntimeException("Transaction characteristics can't be changed while a transaction is in progress");
}
// optimistic transaction only support REPEATABLE-READ transaction isolation
if (value.equalsIgnoreCase("READ-COMMITTED") && getClientInfo("txn_mode").equalsIgnoreCase("optimistic")) {
return;
}
}
sessionVariables.setProperty(name, value);
if (name.equalsIgnoreCase("autocommit")) {
try {
setAutoCommit(value.equalsIgnoreCase("on") ? true : false);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

SessionVariableWatched.getInstance().notifyObservers(
SessionVariableChange.builder().id(id).name(name).value(value).build()
Expand Down
Loading

0 comments on commit 9d20eae

Please sign in to comment.