Skip to content

Commit

Permalink
[fix][dingo-calcite] Add update increment
Browse files Browse the repository at this point in the history
  • Loading branch information
guojn1 authored and JYcz committed Jan 4, 2024
1 parent a47c9e6 commit 49a527f
Show file tree
Hide file tree
Showing 26 changed files with 236 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.sql.Connection;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.util.Objects;

public class SetOptionOperation implements DdlOperation {
Expand All @@ -40,7 +41,7 @@ public class SetOptionOperation implements DdlOperation {

private String name;

private String value;
private String value = "";

public SetOptionOperation(Connection connection, SqlSetOption setOption) {
this.connection = connection;
Expand All @@ -57,6 +58,7 @@ public SetOptionOperation(Connection connection, SqlSetOption setOption) {
if ("USER".equals(scope)) {
name = "@" + name;
}
name = name.toLowerCase();
SqlNode sqlNode = setOption.getValue();
if (sqlNode instanceof SqlNumericLiteral) {
SqlNumericLiteral numericLiteral = (SqlNumericLiteral) sqlNode;
Expand All @@ -65,7 +67,13 @@ public SetOptionOperation(Connection connection, SqlSetOption setOption) {
sqlIdentifier = (SqlIdentifier) sqlNode;
value = sqlIdentifier.names.get(0).toLowerCase();
} else if (sqlNode instanceof SqlLiteral) {
value = "";
Object val = ((SqlLiteral) sqlNode).getValue();
if (val != null) {
value = val.toString();
}
}
if (value.startsWith("'") && value.endsWith("'")) {
value = value.substring(1, value.length() - 1);
}
}

Expand All @@ -74,17 +82,14 @@ public void execute() {
try {
value = VariableValidator.validator(name, value, scope);
if ("SESSION".equals(scope) || "USER".equals(scope)) {
if (value.contains("'")) {
value = value.replace("'", "");
}
if (!setCharacter(name, value)) {
connection.setClientInfo(name, value);
}
} else if ("SYSTEM".equals(scope)) {
putGlobalVariable(name, value);
ScopeVariables.globalVariables.put(name, value);
}
} catch (SQLClientInfoException e) {
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2021 DataCanvas
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.dingodb.calcite.operation;

import java.util.Iterator;
import java.util.List;

public class ShowLocksOperation implements QueryOperation {

@Override
public Iterator getIterator() {
return null;
}

@Override
public List<String> columns() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.dingodb.calcite.grammar.dql.SqlShowFullTables;
import io.dingodb.calcite.grammar.dql.SqlShowGrants;
import io.dingodb.calcite.grammar.dql.SqlShowPlugins;
import io.dingodb.calcite.grammar.dql.SqlShowLocks;
import io.dingodb.calcite.grammar.dql.SqlShowTableDistribution;
import io.dingodb.calcite.grammar.dql.SqlShowTableStatus;
import io.dingodb.calcite.grammar.dql.SqlShowTables;
Expand All @@ -54,7 +55,10 @@
import java.sql.Connection;
import java.util.Optional;

public class SqlToOperationConverter {
public final class SqlToOperationConverter {

private SqlToOperationConverter() {
}

public static Optional<Operation> convert(SqlNode sqlNode, Connection connection, DingoParserContext context) {
if (sqlNode instanceof SqlShowWarnings) {
Expand Down Expand Up @@ -186,6 +190,8 @@ public static Optional<Operation> convert(SqlNode sqlNode, Connection connection
} else if (sqlNode instanceof SqlShowCharset) {
SqlShowCharset sqlShowCharset = (SqlShowCharset) sqlNode;
return Optional.of(new ShowCharsetOperation(sqlShowCharset.sqlLikePattern));
} else if (sqlNode instanceof SqlShowLocks) {
return Optional.of(new ShowLocksOperation());
} else {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ public static String validator(String name, String value, String scope) {
throw new RuntimeException(String.format(ErrorCode.ER_IMMUTABLE_VARIABLES.message, name));
}
if (name.equalsIgnoreCase("autocommit")
|| name.equalsIgnoreCase("transaction_read_only")) {
|| name.equalsIgnoreCase("transaction_read_only")
|| name.equalsIgnoreCase("txn_inert_check")
|| name.equalsIgnoreCase("txn_retry")
) {
if (!SWITCH.contains(value)) {
throw DINGO_RESOURCE.invalidVariableArg(name, value).ex();
}
Expand All @@ -65,6 +68,10 @@ public static String validator(String name, String value, String scope) {
if (!TX_MODE.contains(value)) {
throw DINGO_RESOURCE.invalidVariableArg(name, value).ex();
}
} else if (name.endsWith("timeout") || name.equalsIgnoreCase("txn_retry_cnt")) {
if (!value.matches("\\d+")) {
throw DINGO_RESOURCE.incorrectArgType(name).ex();
}
}

if ("SYSTEM".equals(scope)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public RelNode visit(RelNode other) {
DingoValues dingoValues = (DingoValues) values;
dingoValues.setHasAutoIncrement(true);
dingoValues.setAutoIncrementColIndex(autoIncrementColIndex);
dingoValues.setCommonId(table.getTableId());
return dingoValues;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.dingodb.calcite.rel;

import io.dingodb.calcite.visitor.DingoRelVisitor;
import io.dingodb.common.CommonId;
import lombok.Getter;
import lombok.Setter;
import org.apache.calcite.plan.RelOptCluster;
Expand All @@ -39,6 +40,10 @@ public class DingoValues extends LogicalDingoValues implements DingoRel {
@Setter
private int autoIncrementColIndex;

@Getter
@Setter
private CommonId commonId;

public DingoValues(
RelOptCluster cluster,
RelTraitSet traits,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,10 @@ public interface DingoResource {

@BaseMessage("Illegal column name definition")
ExInst<DingoSqlException> invalidColumn();

@BaseMessage("Incorrect argument type to variable ''{0}''")
ExInst<DingoSqlException> incorrectArgType(String a0);

@BaseMessage("Transaction characteristics can't be changed while a transaction is in progress")
ExInst<DingoSqlException> transChangeError();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ScopeVariables {
immutableVariables.add("have_ssl");
immutableVariables.add("have_statement_timeout");
immutableVariables.add("last_insert_id");
immutableVariables.add("@begin_transaction");

characterSet.add("utf8mb4");
characterSet.add("utf8");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,13 @@ private void extractAutoIncrement(RelNode relNode, String jobIdPrefix) {
if (dingoValues.getTuples().size() >= 1 &&
dingoValues.getAutoIncrementColIndex() < dingoValues.getTuples().get(0).length) {
Object autoValue = dingoValues.getTuples().get(0)[dingoValues.getAutoIncrementColIndex()];
connection.setClientInfo("last_insert_id", autoValue.toString());
connection.setClientInfo(jobIdPrefix, autoValue.toString());
if (autoValue != null) {
String autoValStr = autoValue.toString();
connection.setClientInfo("last_insert_id", autoValStr);
connection.setClientInfo(jobIdPrefix, autoValStr);
MetaService metaService = MetaService.root();
metaService.updateAutoIncrement(dingoValues.getCommonId(), Long.parseLong(autoValStr));
}
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.dingodb.exec.base.JobManager;
import org.apache.calcite.avatica.AvaticaPreparedStatement;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.proto.Common;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.calcite.avatica.util.ByteString;
import org.checkerframework.checker.nullness.qual.NonNull;
Expand Down Expand Up @@ -104,4 +105,24 @@ public void removeJob(JobManager jobManager) {
Meta.Signature signature = getSignature();
DingoStatementUtils.removeJobInSignature(jobManager, signature);
}

@Override
public void setBytes(int parameterIndex, byte[] x) throws SQLException {
if (slots.length >= parameterIndex) {
TypedValue value = slots[parameterIndex - 1];
if (value == null) {
super.setBytes(parameterIndex, x);
} else {
// base64 encode
String valStr = (String) value.value;
byte[] preBytes = ByteString.ofBase64(valStr).getBytes();
byte[] bytes = new byte[preBytes.length + x.length];
System.arraycopy(preBytes, 0, bytes, 0, preBytes.length);
System.arraycopy(x, 0, bytes, preBytes.length, x.length);
super.setBytes(parameterIndex, bytes);
}
} else {
super.setBytes(parameterIndex, x);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ UnknownTable=Error 1051(42S02): Unknown table ''{0}''
UnknownCharacterSet=Error 1115(42000): Unknown character set ''{0}''
invalidVariableArg=Error 1231(42000): Variable ''{0}'' can't be set to the value of ''{1}''
InvalidColumn=Error 1059(42000): Illegal column name definition
IncorrectArgType=ERROR 1232 (42000): Incorrect argument type to variable ''{0}''
TransChangeError=ERROR 1568 (25001): Transaction characteristics can't be changed while a transaction is in progress

# Other errors
PrimaryKeyRequired=Error 1001 (45000): Primary keys are required in table ''{0}''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
public class MysqlConnection {
@Getter
@Setter
private Integer id;
private String id;

@Getter
@Setter
private int threadId;

public SocketChannel channel;

Expand All @@ -51,6 +55,7 @@ public MysqlConnection(SocketChannel channel) {

public void setConnection(DingoConnection dingoConnection) {
connection = dingoConnection;
this.id = dingoConnection.id;
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Date;
Expand Down Expand Up @@ -183,7 +184,7 @@ public void executeSingleQuery(String sql, AtomicLong packetId,
String lastInsertId = mysqlConnection.getConnection()
.getClientInfo().getProperty(jobIdPrefix, "0");
okPacket = MysqlPacketFactory.getInstance()
.getOkPacket(count, packetId, 0, Integer.parseInt(lastInsertId));
.getOkPacket(count, packetId, 0, new BigInteger(lastInsertId));
mysqlConnection.getConnection().getClientInfo().remove(jobIdPrefix);
} else {
okPacket = MysqlPacketFactory.getInstance()
Expand Down Expand Up @@ -320,7 +321,7 @@ public void executeStatement(ExecuteStatementPacket statementPacket,
String lastInsertId = mysqlConnection.getConnection()
.getClientInfo().getProperty(jobIdPrefix, "0");
okPacket = MysqlPacketFactory.getInstance()
.getOkPacket(affected, packetId, 0, Integer.parseInt(lastInsertId));
.getOkPacket(affected, packetId, 0, new BigInteger(lastInsertId));
mysqlConnection.getConnection().getClientInfo().remove(jobIdPrefix);
} else {
okPacket = mysqlPacketFactory.getOkPacket(affected, packetId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,6 @@ private static void handlerRowPacket(ResultSet resultSet, AtomicLong packetId, M
resultSetRowPacket.addColumnValue(val);
}
resultSetRowPacket.write(buffer);
int writerIndex = buffer.writerIndex();
if (writerIndex > 1048576) {
buffer.retain();
mysqlConnection.channel.writeAndFlush(buffer);
buffer.clear();
}
}
}

Expand Down Expand Up @@ -199,11 +193,6 @@ private static void handlerPrepareRowPacket(ResultSet resultSet,
resultSetRowPacket.addColumnValue(resultSet.getObject(i), mysqlConnection);
}
resultSetRowPacket.write(buffer);
int writerIndex = buffer.writerIndex();
if (writerIndex > 1048576) {
mysqlConnection.channel.writeAndFlush(buffer);
buffer.clear();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.commons.lang3.StringUtils;

import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.sql.DriverManager;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -83,7 +83,7 @@ public HandshakeHandler(MysqlConnection mysqlConnection) {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
HandshakePacket handshakePacket = createHandShakePacket();
mysqlConnection.setId(handshakePacket.threadId);
mysqlConnection.setThreadId(handshakePacket.threadId);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
handshakePacket.write(buf);
ctx.writeAndFlush(buf);
Expand Down Expand Up @@ -189,7 +189,7 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
okPacket.affectedRows = 0;
okPacket.packetId = (byte) packetId.get();
okPacket.serverStatus = SERVER_STATUS_AUTOCOMMIT;
okPacket.insertId = 0;
okPacket.insertId = BigInteger.ZERO;
okPacket.message = "connect success".getBytes();
okPacket.write(buffer);
ctx.writeAndFlush(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,25 @@

import io.dingodb.common.concurrent.ThreadPoolBuilder;
import io.dingodb.driver.mysql.MysqlConnection;
import io.dingodb.net.netty.Connection;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.Builder;
import lombok.Getter;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;

@Getter
@Builder
public class MysqlNettyServer {
public final String host;
public final int port;
public static final Map<Integer, MysqlConnection> connections = new ConcurrentHashMap<>();
public static final Map<String, MysqlConnection> connections = new ConcurrentHashMap<>();

private EventLoopGroup eventLoopGroup;
private ServerBootstrap server;
Expand Down
Loading

0 comments on commit 49a527f

Please sign in to comment.