Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding average rate logic in query service #87

Merged
merged 71 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
9ca540d
init commit
Sep 23, 2021
8ce1e48
updated tests
Sep 23, 2021
a93b752
removed debug statement
Sep 23, 2021
ae7e640
modified test
Sep 23, 2021
aad7af7
added support for sum and avg_rate
Sep 23, 2021
7f0097c
updated test files
Sep 23, 2021
089cb3c
refactored
Sep 24, 2021
4f2692f
added test
Sep 24, 2021
334b090
refactored
Sep 24, 2021
f0c2675
changed map signature
Sep 24, 2021
d53b439
refactored the code
Sep 24, 2021
216ace8
updated tests
Sep 24, 2021
711325a
restored gradle changes
Sep 24, 2021
33ebefa
refactored convert method
Sep 24, 2021
3bd93d6
refactored
Sep 24, 2021
67f55da
refactored avg_rate functions
Sep 24, 2021
c1ebc65
added comments for avg_rate
Sep 24, 2021
7dd43eb
added AVG_RATE in pinot
Sep 27, 2021
305632f
removed unused code
Sep 27, 2021
9cca2be
removed unused code
Sep 27, 2021
ee60629
modified comment
Sep 28, 2021
246600d
added unit test
Sep 28, 2021
18781c2
added info in execution context
Oct 1, 2021
50203ef
changed function signature to pass executionContext
Oct 1, 2021
c7d51dd
added avg_rate support
Oct 1, 2021
20b4073
merged main to run spotless
Oct 4, 2021
c4cd3f2
refactored
Oct 4, 2021
5b77e64
refactored executioncontext file
Oct 4, 2021
beeed4b
resolved bugs
Oct 4, 2021
c006e70
refactored
Oct 4, 2021
69a500f
resolved comments
Oct 4, 2021
7cfb5a7
refactored
Oct 5, 2021
93cb20b
updated test
Oct 5, 2021
f1c9029
resolved PR comments
Oct 5, 2021
88b66d7
resolved PR comments
Oct 5, 2021
029f188
changed functionToStringForAvgRate function
Oct 11, 2021
f322ed5
resolved pr comments
Oct 11, 2021
2041c72
added tree traversal for filter
Oct 11, 2021
83e1d1e
resolved comments
Oct 11, 2021
3b9180a
read time attribute from config
Oct 11, 2021
a159461
refactored implementation
Oct 11, 2021
4f8ddaf
.
Oct 11, 2021
67b9e9c
removed comments
Oct 11, 2021
e76db76
refactored
Oct 12, 2021
ab7dcfe
renamed method
Oct 12, 2021
b87da0b
refactored
Oct 12, 2021
dcb6e04
added unit test for changed handler interface
Oct 13, 2021
db84e83
added test for pinotFunctionConverter
Oct 13, 2021
81e6b1d
added unit test
Oct 13, 2021
4002459
resolved PR comments
Oct 13, 2021
b735a55
changed tree traversal signature
Oct 13, 2021
909f014
added unit test
Oct 13, 2021
a1e298c
resolved PR comments
Oct 13, 2021
bc0c19c
minor refactorings
Oct 13, 2021
a4fac59
resolved PR comments
Oct 13, 2021
cd3036e
resolved PR comments
Oct 13, 2021
6f9fa28
updated time in unit tests
Oct 13, 2021
4a49ce0
minor fix
Oct 13, 2021
5378145
refactored
Oct 13, 2021
8ba8fae
resolved PR comments
Oct 14, 2021
49a8f9a
resolved comments
Oct 14, 2021
8e9585f
resolved comments
Oct 14, 2021
a5438ec
refactored test files
Oct 14, 2021
b9e8f1f
refactoring
Oct 14, 2021
f182691
minor refactorings
Oct 14, 2021
31c1633
resolved PR comments
Oct 14, 2021
7052c4a
resolved PR comments
Oct 14, 2021
8d6152d
nit changes
Oct 14, 2021
9c3606e
refactored integration tests
Oct 14, 2021
12f8c3e
resolved comments
Oct 14, 2021
21acc1f
resolved PR comments
Oct 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package org.hypertrace.core.query.service;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.hypertrace.core.query.service.api.ColumnIdentifier;
import org.hypertrace.core.query.service.api.ColumnMetadata;
import org.hypertrace.core.query.service.api.Expression;
Expand Down Expand Up @@ -35,14 +39,62 @@ public class ExecutionContext {
// while the selectedColumns
// is a set of column names.
private final LinkedHashSet<Expression> allSelections;
private final Optional<Double> timeSeriesPeriod;
private final double timeRangeDuration;

public ExecutionContext(String tenantId, QueryRequest request) {
this.tenantId = tenantId;
this.selectedColumns = new LinkedHashSet<>();
this.allSelections = new LinkedHashSet<>();
this.timeSeriesPeriod = setTimeSeriesPeriod(request);
this.timeRangeDuration = setTimeRangeDuration(request);
analyze(request);
}

private Optional<Double> setTimeSeriesPeriod(QueryRequest request) {
Optional<Double> period = Optional.empty();
if (request.getGroupByCount() > 0) {
for (Expression expression : request.getGroupByList()) {
if (expression.getValueCase() == ValueCase.FUNCTION
&& expression.getFunction().getFunctionName().equals("dateTimeConvert")) {
String periodInIso =
expression
.getFunction()
.getArgumentsList()
.get(3)
.getLiteral()
.getValue()
.getString();
period = Optional.of((double) isoDurationToSeconds(periodInIso));
}
}
}
return period;
}

private double setTimeRangeDuration(QueryRequest request) {
Optional<Long> duration = Optional.empty();
if (request.getFilter().getChildFilterCount() > 0) {
for (Filter filter : request.getFilter().getChildFilterList()) {
// will extract columnName as a part of follow up PR
String columnName = filter.getLhs().getColumnIdentifier().getColumnName();
int colLength = columnName.length();
if (colLength >= 9 && columnName.substring(colLength - 9).equals("startTime")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may not start with startTime either - that's not an assumption we want in code. We want to read this col name out of config if we can, I think another comment pointed out where that is. Not sure if we have the context about which view will be selected yet here though, so at a minimum, we should isolate this assumption into its own function that can be fixed later (another option is to take a heuristic approach like look for a pair of filters that are doing >= long and < long, but probably overkill for now).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never check operators here. What if I put a filter on it like startTime = X ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. As discussed with @kotharironak, we will read it from a config file in a later follow up PR. Also possibility of startTime = X was ruled out in the discussion.

Copy link
Contributor

@kotharironak kotharironak Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of the discussion, I was suggesting two things.

  1. handle the config related changes at the end
  2. get the time filter using pair (startTime LT time && startTime GT time)

What do we have to do?

If we agree to use this config (https://github.com/hypertrace/query-service/blob/main/query-service/src/main/resources/configs/common/application.conf#L29 ) as our time filter column, for now, we can add this information in execution context in handler selection. This will also require RequstHandler to expose a method something like getTimeFilterAttribute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given how we generally translate the time range filter (towards the top) I think I'd actually do a breadth first rather than DFS/in-order, but it shouldn't really matter. The important difference from a regular tree walk that will need to be accounted for either way - only recursing through AND nodes. As soon as you hit an OR, you can turn discard that node because a time range can never be OR'd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, agree on using that config - just want to make sure we're careful about any dependencies between parsing the query and handler selection (we need to parse to do handler selection, but we need handler selected to parse out this info).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rish691

Long val = filter.getRhs().getLiteral().getValue().getLong();
if (duration.isPresent()) {
val = val - duration.get();
}
duration = Optional.of(Math.abs(val));
}
}
}

if (duration.isPresent()) {
return (double) TimeUnit.SECONDS.convert(duration.get(), TimeUnit.MILLISECONDS);
}
return 0;
}

private void analyze(QueryRequest request) {
List<String> filterColumns = new ArrayList<>();
LinkedList<Filter> filterQueue = new LinkedList<>();
Expand Down Expand Up @@ -154,6 +206,11 @@ private void extractColumns(List<String> columns, Expression expression) {
}
}

private static long isoDurationToSeconds(String duration) {
Duration d = java.time.Duration.parse(duration);
return d.get(ChronoUnit.SECONDS);
}

public String getTenantId() {
return this.tenantId;
}
Expand All @@ -173,4 +230,12 @@ public LinkedHashSet<String> getSelectedColumns() {
public LinkedHashSet<Expression> getAllSelections() {
return this.allSelections;
}

public Optional<Double> getTimeSeriesPeriod() {
return this.timeSeriesPeriod;
}

public double getTimeRangeDuration() {
return this.timeRangeDuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public interface QueryFunctionConstants {
String QUERY_FUNCTION_MIN = "MIN";
String QUERY_FUNCTION_MAX = "MAX";
String QUERY_FUNCTION_COUNT = "COUNT";
String QUERY_FUNCTION_AVG_RATE = "AVG_RATE";
String QUERY_FUNCTION_PERCENTILE = "PERCENTILE";
String QUERY_FUNCTION_DISTINCTCOUNT = "DISTINCTCOUNT";
String QUERY_FUNCTION_CONCAT = "CONCAT";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.hypertrace.core.query.service.api.SortOrder;
import org.hypertrace.core.query.service.api.Value;
import org.hypertrace.core.query.service.api.ValueType;
import org.hypertrace.core.query.service.pinot.Params.Builder;
import org.hypertrace.core.query.service.pinot.converters.DestinationColumnValueConverter;
import org.hypertrace.core.query.service.pinot.converters.PinotFunctionConverter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -63,7 +64,7 @@ Entry<String, Params> toSQL(
// how it is created.
for (Expression expr : allSelections) {
pqlBuilder.append(delim);
pqlBuilder.append(convertExpression2String(expr, paramsBuilder));
pqlBuilder.append(convertExpression2String(expr, paramsBuilder, executionContext));
delim = ", ";
}

Expand All @@ -75,7 +76,8 @@ Entry<String, Params> toSQL(

if (request.hasFilter()) {
pqlBuilder.append(" AND ");
String filterClause = convertFilter2String(request.getFilter(), paramsBuilder);
String filterClause =
convertFilter2String(request.getFilter(), paramsBuilder, executionContext);
pqlBuilder.append(filterClause);
}

Expand All @@ -84,7 +86,8 @@ Entry<String, Params> toSQL(
delim = "";
for (Expression groupByExpression : request.getGroupByList()) {
pqlBuilder.append(delim);
pqlBuilder.append(convertExpression2String(groupByExpression, paramsBuilder));
pqlBuilder.append(
convertExpression2String(groupByExpression, paramsBuilder, executionContext));
delim = ", ";
}
}
Expand All @@ -93,7 +96,9 @@ Entry<String, Params> toSQL(
delim = "";
for (OrderByExpression orderByExpression : request.getOrderByList()) {
pqlBuilder.append(delim);
String orderBy = convertExpression2String(orderByExpression.getExpression(), paramsBuilder);
String orderBy =
convertExpression2String(
orderByExpression.getExpression(), paramsBuilder, executionContext);
pqlBuilder.append(orderBy);
if (SortOrder.DESC.equals(orderByExpression.getOrder())) {
pqlBuilder.append(" desc ");
Expand All @@ -119,15 +124,16 @@ Entry<String, Params> toSQL(
return new SimpleEntry<>(pqlBuilder.toString(), paramsBuilder.build());
}

private String convertFilter2String(Filter filter, Params.Builder paramsBuilder) {
private String convertFilter2String(
Filter filter, Builder paramsBuilder, ExecutionContext executionContext) {
StringBuilder builder = new StringBuilder();
String operator = convertOperator2String(filter.getOperator());
if (filter.getChildFilterCount() > 0) {
String delim = "";
builder.append("( ");
for (Filter childFilter : filter.getChildFilterList()) {
builder.append(delim);
builder.append(convertFilter2String(childFilter, paramsBuilder));
builder.append(convertFilter2String(childFilter, paramsBuilder, executionContext));
builder.append(" ");
delim = operator + " ";
}
Expand All @@ -140,9 +146,10 @@ private String convertFilter2String(Filter filter, Params.Builder paramsBuilder)
handleValueConversionForLiteralExpression(filter.getLhs(), filter.getRhs());
builder.append(operator);
builder.append("(");
builder.append(convertExpression2String(filter.getLhs(), paramsBuilder));
builder.append(
convertExpression2String(filter.getLhs(), paramsBuilder, executionContext));
builder.append(",");
builder.append(convertExpression2String(rhs, paramsBuilder));
builder.append(convertExpression2String(rhs, paramsBuilder, executionContext));
builder.append(")");
break;
case CONTAINS_KEY:
Expand Down Expand Up @@ -175,11 +182,12 @@ private String convertFilter2String(Filter filter, Params.Builder paramsBuilder)
break;
default:
rhs = handleValueConversionForLiteralExpression(filter.getLhs(), filter.getRhs());
builder.append(convertExpression2String(filter.getLhs(), paramsBuilder));
builder.append(
convertExpression2String(filter.getLhs(), paramsBuilder, executionContext));
builder.append(" ");
builder.append(operator);
builder.append(" ");
builder.append(convertExpression2String(rhs, paramsBuilder));
builder.append(convertExpression2String(rhs, paramsBuilder, executionContext));
}
}
return builder.toString();
Expand Down Expand Up @@ -251,7 +259,8 @@ private String convertOperator2String(Operator operator) {
}
}

private String convertExpression2String(Expression expression, Params.Builder paramsBuilder) {
private String convertExpression2String(
Expression expression, Builder paramsBuilder, ExecutionContext executionContext) {
switch (expression.getValueCase()) {
case COLUMNIDENTIFIER:
String logicalColumnName = expression.getColumnIdentifier().getColumnName();
Expand All @@ -262,11 +271,13 @@ private String convertExpression2String(Expression expression, Params.Builder pa
return convertLiteralToString(expression.getLiteral(), paramsBuilder);
case FUNCTION:
return this.functionConverter.convert(
executionContext,
expression.getFunction(),
argExpression -> convertExpression2String(argExpression, paramsBuilder));
argExpression ->
convertExpression2String(argExpression, paramsBuilder, executionContext));
case ORDERBY:
OrderByExpression orderBy = expression.getOrderBy();
return convertExpression2String(orderBy.getExpression(), paramsBuilder);
return convertExpression2String(orderBy.getExpression(), paramsBuilder, executionContext);
case VALUE_NOT_SET:
break;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package org.hypertrace.core.query.service.pinot.converters;

import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_AVG_RATE;
import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_CONCAT;
import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_COUNT;
import static org.hypertrace.core.query.service.QueryFunctionConstants.QUERY_FUNCTION_PERCENTILE;

import java.util.Optional;
import java.util.stream.Collectors;
import org.hypertrace.core.query.service.ExecutionContext;
import org.hypertrace.core.query.service.api.Expression;
import org.hypertrace.core.query.service.api.Function;
import org.hypertrace.core.query.service.api.Function.Builder;
import org.hypertrace.core.query.service.api.LiteralConstant;
import org.hypertrace.core.query.service.api.Value;
import org.hypertrace.core.query.service.api.ValueType;
Expand All @@ -16,6 +19,9 @@ public class PinotFunctionConverter {
/**
* Computing PERCENTILE in Pinot is resource intensive. T-Digest calculation is much faster and
* reasonably accurate, hence use that as the default.
*
* <p>AVG_RATE not supported directly in Pinot. So AVG_RATE is computed by summing over all values
* and then dividing by a constant.
*/
private static final String DEFAULT_PERCENTILE_AGGREGATION_FUNCTION = "PERCENTILETDIGEST";

Expand All @@ -34,24 +40,73 @@ public PinotFunctionConverter() {
}

public String convert(
Function function, java.util.function.Function<Expression, String> argumentConverter) {
ExecutionContext executionContext,
Function function,
java.util.function.Function<Expression, String> argumentConverter) {
switch (function.getFunctionName().toUpperCase()) {
case QUERY_FUNCTION_COUNT:
return this.convertCount();
case QUERY_FUNCTION_PERCENTILE:
return this.functionToString(this.toPinotPercentile(function), argumentConverter);
case QUERY_FUNCTION_CONCAT:
return this.functionToString(this.toPinotConcat(function), argumentConverter);
case QUERY_FUNCTION_AVG_RATE:
return this.functionToStringForAvgRate(function, argumentConverter, executionContext);
default:
// TODO remove once pinot-specific logic removed from gateway - this normalization reverts
// that logic
if (this.isHardcodedPercentile(function)) {
return this.convert(this.normalizeHardcodedPercentile(function), argumentConverter);
return this.convert(
executionContext, this.normalizeHardcodedPercentile(function), argumentConverter);
}
return this.functionToString(function, argumentConverter);
}
}

private String functionToStringForAvgRate(
Function function,
java.util.function.Function<Expression, String> argumentConverter,
ExecutionContext executionContext) {
function = updateFunctionForAvgRate(function, executionContext);
String argumentString =
function.getArgumentsList().stream()
.map(argumentConverter::apply)
.collect(Collectors.joining(","));

return "SUM(DIV(" + argumentString + "))";
}

private Function updateFunctionForAvgRate(Function function, ExecutionContext executionContext) {

Expression columnName = function.getArgumentsList().get(0);
Expression literal = function.getArgumentsList().get(1);

Builder builder = function.toBuilder();
builder.clearArguments();
builder.addArguments(columnName);
builder.addArguments(getUpdatedLiteral(literal, executionContext));

return builder.build();
}

private Expression getUpdatedLiteral(Expression literal, ExecutionContext executionContext) {

long rateIntervalInSeconds = literal.getLiteral().getValue().getLong();
double aggregateIntervalInSeconds =
executionContext.getTimeSeriesPeriod().orElse(executionContext.getTimeRangeDuration());

return Expression.newBuilder()
.setLiteral(
LiteralConstant.newBuilder()
.setValue(
Value.newBuilder()
.setDouble(aggregateIntervalInSeconds / rateIntervalInSeconds)
.setValueType(ValueType.DOUBLE)
.build())
.build())
.build();
}

private String functionToString(
Function function, java.util.function.Function<Expression, String> argumentConverter) {
String argumentString =
Expand Down
Loading