Skip to content

Commit

Permalink
Add tracing support for internals and JSON-RPC
Browse files Browse the repository at this point in the history
Signed-off-by: Antoine Toulme <[email protected]>
  • Loading branch information
atoulme committed Nov 12, 2020
1 parent 45fd9f8 commit 9303e60
Show file tree
Hide file tree
Showing 18 changed files with 343 additions and 124 deletions.
1 change: 1 addition & 0 deletions ethereum/api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ dependencies {

implementation 'com.google.guava:guava'
implementation 'com.graphql-java:graphql-java'
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'io.vertx:vertx-auth-jwt'
implementation 'io.vertx:vertx-core'
implementation 'io.vertx:vertx-unit'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand All @@ -81,6 +86,7 @@
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
Expand All @@ -104,6 +110,7 @@ public class JsonRpcHttpService {
private final NatService natService;
private final Path dataDir;
private final LabelledMetric<OperationTimer> requestTimer;
private final Tracer tracer;

@VisibleForTesting public final Optional<AuthenticationService> authenticationService;

Expand Down Expand Up @@ -169,6 +176,7 @@ private JsonRpcHttpService(
this.authenticationService = authenticationService;
this.livenessService = livenessService;
this.readinessService = readinessService;
this.tracer = OpenTelemetry.getGlobalTracer("io.hyperledger.besu.jsonrpc", "1.0.0");
}

private void validateConfig(final JsonRpcConfiguration config) {
Expand Down Expand Up @@ -229,6 +237,7 @@ public CompletableFuture<?> start() {
private Router buildRouter() {
// Handle json rpc requests
final Router router = Router.router(vertx);
router.route().handler(this::createSpan);

// Verify Host header to avoid rebind attack.
router.route().handler(checkAllowlistHostHeader());
Expand Down Expand Up @@ -279,6 +288,26 @@ private Router buildRouter() {
return router;
}

private void createSpan(final RoutingContext routingContext) {
final SocketAddress address = routingContext.request().connection().remoteAddress();
final Span serverSpan =
tracer
.spanBuilder(address.host() + ":" + address.port())
.setSpanKind(Span.Kind.SERVER)
.startSpan();

routingContext.addBodyEndHandler(event -> serverSpan.end());
routingContext.addEndHandler(
event -> {
if (event.failed()) {
serverSpan.recordException(event.cause());
serverSpan.setStatus(StatusCode.ERROR);
}
serverSpan.end();
});
routingContext.next();
}

private HttpServerOptions getHttpServerOptions() {
final HttpServerOptions httpServerOptions =
new HttpServerOptions()
Expand Down Expand Up @@ -590,40 +619,55 @@ private JsonRpcResponse process(
} catch (final IllegalArgumentException exception) {
return errorResponse(id, INVALID_REQUEST);
}
// Handle notifications
if (requestBody.isNotification()) {
// Notifications aren't handled so create empty result for now.
return NO_RESPONSE;
}
Span span =
tracer
.spanBuilder(requestBody.getMethod())
.setSpanKind(Span.Kind.INTERNAL)
.setParent(Context.current())
.startSpan();
try {
// Handle notifications
if (requestBody.isNotification()) {
// Notifications aren't handled so create empty result for now.
return NO_RESPONSE;
}

final Optional<JsonRpcError> unavailableMethod = validateMethodAvailability(requestBody);
if (unavailableMethod.isPresent()) {
return errorResponse(id, unavailableMethod.get());
}
final Optional<JsonRpcError> unavailableMethod = validateMethodAvailability(requestBody);
if (unavailableMethod.isPresent()) {
span.setStatus(StatusCode.ERROR, "method unavailable");
return errorResponse(id, unavailableMethod.get());
}

final JsonRpcMethod method = rpcMethods.get(requestBody.getMethod());
final JsonRpcMethod method = rpcMethods.get(requestBody.getMethod());

if (AuthenticationUtils.isPermitted(authenticationService, user, method)) {
// Generate response
try (final OperationTimer.TimingContext ignored =
requestTimer.labels(requestBody.getMethod()).startTimer()) {
if (user.isPresent()) {
if (AuthenticationUtils.isPermitted(authenticationService, user, method)) {
// Generate response
try (final OperationTimer.TimingContext ignored =
requestTimer.labels(requestBody.getMethod()).startTimer()) {
if (user.isPresent()) {
return method.response(
new JsonRpcRequestContext(requestBody, user.get(), () -> !ctx.response().closed()));
}
return method.response(
new JsonRpcRequestContext(requestBody, user.get(), () -> !ctx.response().closed()));
new JsonRpcRequestContext(requestBody, () -> !ctx.response().closed()));
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params", e);
span.setStatus(StatusCode.ERROR, "Invalid Params");
return errorResponse(id, JsonRpcError.INVALID_PARAMS);
} catch (final MultiTenancyValidationException e) {
span.setStatus(StatusCode.ERROR, "Unauthorized");
return unauthorizedResponse(id, JsonRpcError.UNAUTHORIZED);
} catch (final RuntimeException e) {
LOG.error("Error processing JSON-RPC requestBody", e);
span.setStatus(StatusCode.ERROR, "Error processing JSON-RPC requestBody");
return errorResponse(id, JsonRpcError.INTERNAL_ERROR);
}
return method.response(
new JsonRpcRequestContext(requestBody, () -> !ctx.response().closed()));
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params", e);
return errorResponse(id, JsonRpcError.INVALID_PARAMS);
} catch (final MultiTenancyValidationException e) {
} else {
span.setStatus(StatusCode.ERROR, "Unauthorized");
return unauthorizedResponse(id, JsonRpcError.UNAUTHORIZED);
} catch (final RuntimeException e) {
LOG.error("Error processing JSON-RPC requestBody", e);
return errorResponse(id, JsonRpcError.INTERNAL_ERROR);
}
} else {
return unauthorizedResponse(id, JsonRpcError.UNAUTHORIZED);
} finally {
span.end();
}
}

Expand Down
1 change: 1 addition & 0 deletions ethereum/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies {

implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.google.guava:guava'
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'io.vertx:vertx-core'
implementation 'net.java.dev.jna:jna'
implementation 'org.apache.logging.log4j:log4j-api'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
import java.util.List;

import com.google.common.collect.ImmutableList;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -46,6 +50,9 @@ TransactionReceipt create(

private static final Logger LOG = LogManager.getLogger();

private static final Tracer tracer =
OpenTelemetry.getGlobalTracer("io.hyperledger.besu.block", "1.0.0");

static final int MAX_GENERATION = 6;

public static class Result implements BlockProcessor.Result {
Expand Down Expand Up @@ -117,65 +124,74 @@ public AbstractBlockProcessor.Result processBlock(
final List<Transaction> transactions,
final List<BlockHeader> ommers,
final PrivateMetadataUpdater privateMetadataUpdater) {

final List<TransactionReceipt> receipts = new ArrayList<>();
long currentGasUsed = 0;
for (final Transaction transaction : transactions) {
final long remainingGasBudget = blockHeader.getGasLimit() - currentGasUsed;
if (!gasBudgetCalculator.hasBudget(
transaction, blockHeader.getNumber(), blockHeader.getGasLimit(), currentGasUsed)) {
LOG.info(
"Block processing error: transaction gas limit {} exceeds available block budget"
+ " remaining {}. Block {} Transaction {}",
transaction.getGasLimit(),
remainingGasBudget,
blockHeader.getHash().toHexString(),
transaction.getHash().toHexString());
return AbstractBlockProcessor.Result.failed();
Span globalProcessBlock =
tracer.spanBuilder("processBlock").setSpanKind(Span.Kind.INTERNAL).startSpan();
try {

final List<TransactionReceipt> receipts = new ArrayList<>();
long currentGasUsed = 0;
for (final Transaction transaction : transactions) {
final long remainingGasBudget = blockHeader.getGasLimit() - currentGasUsed;
if (!gasBudgetCalculator.hasBudget(
transaction, blockHeader.getNumber(), blockHeader.getGasLimit(), currentGasUsed)) {
LOG.info(
"Block processing error: transaction gas limit {} exceeds available block budget"
+ " remaining {}. Block {} Transaction {}",
transaction.getGasLimit(),
remainingGasBudget,
blockHeader.getHash().toHexString(),
transaction.getHash().toHexString());
globalProcessBlock.setStatus(StatusCode.ERROR, "Block processing error with gas limit");
return AbstractBlockProcessor.Result.failed();
}

final WorldUpdater worldStateUpdater = worldState.updater();
final BlockHashLookup blockHashLookup = new BlockHashLookup(blockHeader, blockchain);
final Address miningBeneficiary =
miningBeneficiaryCalculator.calculateBeneficiary(blockHeader);

final TransactionProcessingResult result =
transactionProcessor.processTransaction(
blockchain,
worldStateUpdater,
blockHeader,
transaction,
miningBeneficiary,
OperationTracer.NO_TRACING,
blockHashLookup,
true,
TransactionValidationParams.processingBlock(),
privateMetadataUpdater);
if (result.isInvalid()) {
LOG.info(
"Block processing error: transaction invalid '{}'. Block {} Transaction {}",
result.getValidationResult().getInvalidReason(),
blockHeader.getHash().toHexString(),
transaction.getHash().toHexString());
globalProcessBlock.setStatus(StatusCode.ERROR, "Invalid transaction");
return AbstractBlockProcessor.Result.failed();
}

worldStateUpdater.commit();

currentGasUsed += transaction.getGasLimit() - result.getGasRemaining();

final TransactionReceipt transactionReceipt =
transactionReceiptFactory.create(result, worldState, currentGasUsed);
receipts.add(transactionReceipt);
}

final WorldUpdater worldStateUpdater = worldState.updater();
final BlockHashLookup blockHashLookup = new BlockHashLookup(blockHeader, blockchain);
final Address miningBeneficiary =
miningBeneficiaryCalculator.calculateBeneficiary(blockHeader);

final TransactionProcessingResult result =
transactionProcessor.processTransaction(
blockchain,
worldStateUpdater,
blockHeader,
transaction,
miningBeneficiary,
OperationTracer.NO_TRACING,
blockHashLookup,
true,
TransactionValidationParams.processingBlock(),
privateMetadataUpdater);
if (result.isInvalid()) {
LOG.info(
"Block processing error: transaction invalid '{}'. Block {} Transaction {}",
result.getValidationResult().getInvalidReason(),
blockHeader.getHash().toHexString(),
transaction.getHash().toHexString());
if (!rewardCoinbase(worldState, blockHeader, ommers, skipZeroBlockRewards)) {
globalProcessBlock.setStatus(StatusCode.ERROR, "Coinbase reward error");
// no need to log, rewardCoinbase logs the error.
return AbstractBlockProcessor.Result.failed();
}

worldStateUpdater.commit();

currentGasUsed += transaction.getGasLimit() - result.getGasRemaining();

final TransactionReceipt transactionReceipt =
transactionReceiptFactory.create(result, worldState, currentGasUsed);
receipts.add(transactionReceipt);
}

if (!rewardCoinbase(worldState, blockHeader, ommers, skipZeroBlockRewards)) {
// no need to log, rewardCoinbase logs the error.
return AbstractBlockProcessor.Result.failed();
worldState.persist();
return AbstractBlockProcessor.Result.successful(receipts);
} finally {
globalProcessBlock.end();
}

worldState.persist();
return AbstractBlockProcessor.Result.successful(receipts);
}

protected MiningBeneficiaryCalculator getMiningBeneficiaryCalculator() {
Expand Down
1 change: 1 addition & 0 deletions ethereum/trie/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation project(':services:kvstore')

implementation 'com.google.guava:guava'
implementation 'io.opentelemetry:opentelemetry-api'
implementation 'org.apache.tuweni:bytes'
implementation 'org.bouncycastle:bcprov-jdk15on'

Expand Down
Loading

0 comments on commit 9303e60

Please sign in to comment.