Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add monitoring routes #134

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/client_eventstreams_delete.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -30,7 +30,7 @@ func clientEventStreamsDeleteCommand(clientFactory func() (apiclient.FFTMClient,
Use: "delete",
Short: "Delete event streams",
Long: "",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
client, err := clientFactory()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/client_eventstreams_list.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -30,7 +30,7 @@ func clientEventStreamsListCommand(clientFactory func() (apiclient.FFTMClient, e
Use: "list",
Short: "List event streams",
Long: "",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
client, err := clientFactory()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/client_listeners_delete.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -30,7 +30,7 @@ func clientListenersDeleteCommand(clientFactory func() (apiclient.FFTMClient, er
Use: "delete",
Short: "Delete event streams",
Long: "",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
client, err := clientFactory()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/client_listeners_list.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -30,7 +30,7 @@ func clientListenersListCommand(clientFactory func() (apiclient.FFTMClient, erro
Use: "list",
Short: "List listeners",
Long: "",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
client, err := clientFactory()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/migrate.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -41,7 +41,7 @@ func buildLeveldb2postgresCommand(initConfig func() error) *cobra.Command {
leveldb2postgresEventStreamsCmd := &cobra.Command{
Use: "leveldb2postgres",
Short: "Migrate from LevelDB to PostgreSQL persistence",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
if err := initConfig(); err != nil {
return err
}
Expand Down
44 changes: 42 additions & 2 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|address|The IP address on which the metrics HTTP API should listen|`int`|`127.0.0.1`
|enabled|Enables the metrics API|`boolean`|`false`
|path|The path from which to serve the Prometheus metrics|`string`|`/metrics`
|enabled|Deprecated: Please use 'monitoring.enabled' instead|`boolean`|`false`
|path|Deprecated: Please use 'monitoring.metricsPath' instead|`string`|`/metrics`
|port|The port on which the metrics HTTP API should listen|`int`|`6000`
|publicURL|The fully qualified public URL for the metrics API. This is used for building URLs in HTTP responses and in OpenAPI Spec generation|URL `string`|`<nil>`
|readTimeout|The maximum time to wait when reading from an HTTP connection|[`time.Duration`](https://pkg.go.dev/time#Duration)|`15s`
Expand Down Expand Up @@ -210,6 +210,46 @@
|keyFile|The path to the private key file for TLS on this API|`string`|`<nil>`
|requiredDNAttributes|A set of required subject DN attributes. Each entry is a regular expression, and the subject certificate must have a matching attribute of the specified type (CN, C, O, OU, ST, L, STREET, POSTALCODE, SERIALNUMBER are valid attributes)|`map[string]string`|`<nil>`

## monitoring

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|address|Listener address|`int`|`127.0.0.1`
|enabled|Enables the monitoring APIs|`boolean`|`false`
|metricsPath|The path from which to serve the Prometheus metrics|`string`|`/metrics`
|port|Listener port|`int`|`6000`
|publicURL|Externally available URL for the HTTP endpoint|`string`|`<nil>`
|readTimeout|HTTP server read timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`15s`
|shutdownTimeout|HTTP server shutdown timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`10s`
|writeTimeout|HTTP server write timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`15s`

## monitoring.auth

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|type|The auth plugin to use for server side authentication of requests|`string`|`<nil>`

## monitoring.auth.basic

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|passwordfile|The path to a .htpasswd file to use for authenticating requests. Passwords should be hashed with bcrypt.|`string`|`<nil>`

## monitoring.tls

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|ca|The TLS certificate authority in PEM format (this option is ignored if caFile is also set)|`string`|`<nil>`
|caFile|The path to the CA file for TLS on this API|`string`|`<nil>`
|cert|The TLS certificate in PEM format (this option is ignored if certFile is also set)|`string`|`<nil>`
|certFile|The path to the certificate file for TLS on this API|`string`|`<nil>`
|clientAuth|Enables or disables client auth for TLS on this API|`string`|`<nil>`
|enabled|Enables or disables TLS on this API|`boolean`|`false`
|insecureSkipHostVerify|When to true in unit test development environments to disable TLS verification. Use with extreme caution|`boolean`|`<nil>`
|key|The TLS certificate key in PEM format (this option is ignored if keyFile is also set)|`string`|`<nil>`
|keyFile|The path to the private key file for TLS on this API|`string`|`<nil>`
|requiredDNAttributes|A set of required subject DN attributes. Each entry is a regular expression, and the subject certificate must have a matching attribute of the specified type (CN, C, O, OU, ST, L, STREET, POSTALCODE, SERIALNUMBER are valid attributes)|`map[string]string`|`<nil>`

## persistence

|Key|Description|Type|Default Value|
Expand Down
10 changes: 5 additions & 5 deletions internal/apiclient/eventstreams.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -31,7 +31,7 @@ func (c *fftmClient) GetEventStreams(ctx context.Context) ([]apitypes.EventStrea
SetResult(&eventStreams).
Get("eventstreams")
if !resp.IsSuccess() {
return nil, fmt.Errorf(string(resp.Body()))
return nil, fmt.Errorf("%s", string(resp.Body()))
}
return eventStreams, err
}
Expand All @@ -43,7 +43,7 @@ func (c *fftmClient) GetListeners(ctx context.Context, eventStreamID string) ([]
SetResult(&listeners).
Get(fmt.Sprintf("eventstreams/%s/listeners", eventStreamID))
if !resp.IsSuccess() {
return nil, fmt.Errorf(string(resp.Body()))
return nil, fmt.Errorf("%s", string(resp.Body()))
}
return listeners, err
}
Expand All @@ -56,7 +56,7 @@ func (c *fftmClient) DeleteEventStream(ctx context.Context, eventStreamID string
return err
}
if !resp.IsSuccess() {
return fmt.Errorf(string(resp.Body()))
return fmt.Errorf("%s", string(resp.Body()))
}
return nil
}
Expand Down Expand Up @@ -92,7 +92,7 @@ func (c *fftmClient) DeleteListener(ctx context.Context, eventStreamID, listener
return err
}
if !resp.IsSuccess() {
return fmt.Errorf(string(resp.Body()))
return fmt.Errorf("%s", string(resp.Body()))
}
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion internal/confirmations/confirmations.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -311,6 +311,7 @@ func (bcm *blockConfirmationManager) getBlockByHash(blockHash string) (*apitypes

func (bcm *blockConfirmationManager) getBlockByNumber(blockNumber uint64, allowCache bool, expectedParentHash string) (*apitypes.BlockInfo, error) {
res, reason, err := bcm.connector.BlockInfoByNumber(bcm.ctx, &ffcapi.BlockInfoByNumberRequest{
//nolint:gosec
BlockNumber: fftypes.NewFFBigInt(int64(blockNumber)),
AllowCache: allowCache,
ExpectedParentHash: expectedParentHash,
Expand Down
3 changes: 2 additions & 1 deletion internal/confirmations/confirmed_block_listener.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -357,6 +357,7 @@ func (cbl *confirmedBlockListener) dispatchAllConfirmed() {
BlockEvent: &ffcapi.BlockEvent{
ListenerID: cbl.id,
BlockInfo: ffcapi.BlockInfo{
//nolint:gosec
BlockNumber: fftypes.NewFFBigInt(int64(block.BlockNumber)),
BlockHash: block.BlockHash,
ParentHash: block.ParentHash,
Expand Down
13 changes: 10 additions & 3 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -65,6 +65,7 @@ var esDefaults struct {
}

func InitDefaults() {
//nolint:gosec
esDefaults.batchSize = config.GetInt64(tmconfig.EventStreamsDefaultsBatchSize)
esDefaults.batchTimeout = fftypes.FFDuration(config.GetDuration(tmconfig.EventStreamsDefaultsBatchTimeout))
esDefaults.errorHandling = fftypes.FFEnum(config.GetString(tmconfig.EventStreamsDefaultsErrorHandling))
Expand Down Expand Up @@ -218,6 +219,7 @@ func mergeValidateEsConfig(ctx context.Context, base *apitypes.EventStream, upda

// Batch timeout
if updates.EthCompatBatchTimeoutMS != nil {
//nolint:gosec
dv := fftypes.FFDuration(*updates.EthCompatBatchTimeoutMS) * fftypes.FFDuration(time.Millisecond)
changed = apitypes.CheckUpdateDuration(changed, &merged.BatchTimeout, base.BatchTimeout, &dv, esDefaults.batchTimeout)
} else {
Expand All @@ -226,6 +228,7 @@ func mergeValidateEsConfig(ctx context.Context, base *apitypes.EventStream, upda

// Retry timeout
if updates.EthCompatRetryTimeoutSec != nil {
//nolint:gosec
dv := fftypes.FFDuration(*updates.EthCompatRetryTimeoutSec) * fftypes.FFDuration(time.Second)
changed = apitypes.CheckUpdateDuration(changed, &merged.RetryTimeout, base.RetryTimeout, &dv, esDefaults.retryTimeout)
} else {
Expand All @@ -234,6 +237,7 @@ func mergeValidateEsConfig(ctx context.Context, base *apitypes.EventStream, upda

// Blocked retry delay
if updates.EthCompatBlockedRetryDelaySec != nil {
//nolint:gosec
dv := fftypes.FFDuration(*updates.EthCompatBlockedRetryDelaySec) * fftypes.FFDuration(time.Second)
changed = apitypes.CheckUpdateDuration(changed, &merged.BlockedRetryDelay, base.BlockedRetryDelay, &dv, esDefaults.blockedRetryDelay)
} else {
Expand Down Expand Up @@ -509,7 +513,8 @@ func (es *eventStream) Start(ctx context.Context) error {
startTime: fftypes.Now(),
eventLoopDone: make(chan struct{}),
batchLoopDone: make(chan struct{}),
updates: make(chan *ffcapi.ListenerEvent, int(*es.spec.BatchSize)),
//nolint:gosec
updates: make(chan *ffcapi.ListenerEvent, int(*es.spec.BatchSize)),
}
startedState.ctx, startedState.cancelCtx = context.WithCancel(es.bgCtx)
es.currentState = startedState
Expand Down Expand Up @@ -781,6 +786,8 @@ func (es *eventStream) checkConfirmedEventForBatch(e *ffcapi.ListenerEvent) (l *
func (es *eventStream) batchLoop(startedState *startedStreamState) {
defer close(startedState.batchLoopDone)
ctx := startedState.ctx

//nolint:gosec
maxSize := int(*es.spec.BatchSize)
batchNumber := int64(0)

Expand Down Expand Up @@ -955,7 +962,7 @@ func (es *eventStream) writeCheckpoint(startedState *startedStreamState, batch *
}

// We only return if the context is cancelled, or the checkpoint succeeds
return es.retry.Do(startedState.ctx, "checkpoint", func(attempt int) (retry bool, err error) {
return es.retry.Do(startedState.ctx, "checkpoint", func(_ int) (retry bool, err error) {
return true, es.persistence.WriteCheckpoint(startedState.ctx, cp)
})
}
Expand Down
9 changes: 6 additions & 3 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -58,7 +58,7 @@ func NewMetricsManager(ctx context.Context) Metrics {
)
mm := &metricsManager{
ctx: ctx,
metricsEnabled: config.GetBool(tmconfig.MetricsEnabled),
metricsEnabled: config.GetBool(tmconfig.DeprecatedMetricsEnabled) || config.GetBool(tmconfig.MonitoringEnabled),
timeMap: make(map[string]time.Time),
metricsRegistry: metricsRegistry,
eventsMetricsManager: eventsMetricsManager,
Expand All @@ -74,7 +74,10 @@ func (mm *metricsManager) IsMetricsEnabled() bool {
}

func (mm *metricsManager) HTTPHandler() http.Handler {
httpHandler, _ := mm.metricsRegistry.HTTPHandler(mm.ctx, promhttp.HandlerOpts{})
httpHandler, err := mm.metricsRegistry.HTTPHandler(mm.ctx, promhttp.HandlerOpts{})
if err != nil {
panic(err)
}
return httpHandler
}

Expand Down
7 changes: 4 additions & 3 deletions internal/persistence/leveldb/leveldb_persistence.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -478,6 +478,7 @@ func (p *leveldbPersistence) InsertTransactionWithNextNonce(ctx context.Context,
// From this point on, we will guide this transaction through to submission.
// We return an "ack" at this point, and dispatch the work of getting the transaction submitted
// to the background worker.
//nolint:gosec // Safe conversion as nonce is always positive
tx.Nonce = fftypes.NewFFBigInt(int64(lockedNonce.nonce))

if err = p.writeTransaction(ctx, &apitypes.TXWithStatus{
Expand Down Expand Up @@ -586,7 +587,7 @@ func (p *leveldbPersistence) UpdateTransaction(ctx context.Context, txID string,
return p.writeTransaction(ctx, tx, false)
}

func (p *leveldbPersistence) writeTransaction(ctx context.Context, tx *apitypes.TXWithStatus, new bool) (err error) {
func (p *leveldbPersistence) writeTransaction(ctx context.Context, tx *apitypes.TXWithStatus, newTx bool) (err error) {
// We take a write-lock here, because we are writing multiple values (the indexes), and anybody
// attempting to read the critical nonce allocation index must know the difference between a partial write
// (we crashed before we completed all the writes) and an incomplete write that's in process.
Expand All @@ -608,7 +609,7 @@ func (p *leveldbPersistence) writeTransaction(ctx context.Context, tx *apitypes.
return i18n.NewError(ctx, tmmsgs.MsgPersistenceTXIncomplete)
}
idKey := txDataKey(tx.ID)
if new {
if newTx {
if tx.SequenceID != "" {
// for new transactions sequence ID should always be generated by persistence layer
// as the format of its value is persistence service specific
Expand Down
3 changes: 2 additions & 1 deletion internal/persistence/postgres/sqlpersistence.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -103,6 +103,7 @@ func (p *sqlPersistence) RichQuery() persistence.RichQuery {
}

func (p *sqlPersistence) seqAfterFilter(ctx context.Context, qf *ffapi.QueryFields, after *int64, limit int, dir txhandler.SortDirection, conditions ...ffapi.Filter) (filter ffapi.Filter) {
//nolint:gosec // Safe conversion as limit is always positive
fb := qf.NewFilterLimit(ctx, uint64(limit))
if after != nil {
if dir == txhandler.SortDirectionDescending {
Expand Down
6 changes: 4 additions & 2 deletions internal/persistence/postgres/transaction_writer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -98,7 +98,8 @@ func newTransactionWriter(bgCtx context.Context, p *sqlPersistence, conf config.
batchMaxSize := conf.GetInt(ConfigTXWriterBatchSize)
cacheSlots := conf.GetInt(ConfigTXWriterCacheSlots)
tw = &transactionWriter{
p: p,
p: p,
//nolint:gosec // Safe conversion as workerCount is always positive
workerCount: uint32(workerCount),
batchTimeout: conf.GetDuration(ConfigTXWriterBatchTimeout),
batchMaxSize: batchMaxSize,
Expand Down Expand Up @@ -354,6 +355,7 @@ func (tw *transactionWriter) assignNonces(ctx context.Context, txInsertsByFrom m
}
}
log.L(ctx).Infof("Assigned nonce %s / %d to %s", signer, cacheEntry.nextNonce, op.txInsert.ID)
//nolint:gosec // Safe conversion as nextNonce is always positive
op.txInsert.Nonce = fftypes.NewFFBigInt(int64(cacheEntry.nextNonce))
cacheEntry.nextNonce++
tw.nextNonceCache.Add(signer, cacheEntry)
Expand Down
3 changes: 2 additions & 1 deletion internal/persistence/postgres/transactions.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -141,6 +141,7 @@ func (p *sqlPersistence) ListTransactionsByCreateTime(ctx context.Context, after
}

func (p *sqlPersistence) ListTransactionsByNonce(ctx context.Context, signer string, after *fftypes.FFBigInt, limit int, dir txhandler.SortDirection) ([]*apitypes.ManagedTX, error) {
//nolint:gosec // Safe conversion as limit is always positive
fb := persistence.TransactionFilters.NewFilterLimit(ctx, uint64(limit))
conditions := []ffapi.Filter{
fb.Eq("from", signer),
Expand Down
4 changes: 3 additions & 1 deletion internal/persistence/postgres/txhistory.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -153,7 +153,9 @@ func (p *sqlPersistence) buildHistorySummary(ctx context.Context, txID string, b
var lastRecordSameSubStatus *apitypes.TXHistoryRecord
for {
filter := persistence.TXHistoryFilters.
//nolint:gosec // Safe conversion as pageSize is always positive
NewFilterLimit(ctx, uint64(pageSize)).Eq("transaction", txID).
//nolint:gosec // Safe conversion as skip is always positive
Skip(uint64(skip))
page, _, err := p.txHistory.GetMany(ctx, filter)
if err != nil {
Expand Down
Loading