Skip to content

Commit

Permalink
feat: store payment status
Browse files Browse the repository at this point in the history
  • Loading branch information
turip committed Jan 31, 2025
1 parent c9ab650 commit 4157137
Show file tree
Hide file tree
Showing 40 changed files with 1,318 additions and 94 deletions.
1 change: 1 addition & 0 deletions api/spec/src/billing/invoices/invoice.tsp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ model InvoiceWorkflowSettings {
workflow: OpenMeter.Billing.BillingWorkflow;
}

// TODO: Update!
/**
* InvoiceStatus describes the status of an invoice.
*/
Expand Down
9 changes: 5 additions & 4 deletions app/common/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewAppService(logger *slog.Logger, db *entdb.Client, appsConfig config.Apps
})
}

func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service) (appstripe.Service, error) {
func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service, billingService billing.Service) (appstripe.Service, error) {
// TODO: remove this check after enabled by default
if !appsConfig.Enabled || db == nil {
return nil, nil
Expand All @@ -66,9 +66,10 @@ func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig confi
}

return appstripeservice.New(appstripeservice.Config{
Adapter: appStripeAdapter,
AppService: appService,
SecretService: secretService,
Adapter: appStripeAdapter,
AppService: appService,
SecretService: secretService,
BillingService: billingService,
})
}

Expand Down
2 changes: 0 additions & 2 deletions app/common/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/app"
appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe"
"github.com/openmeterio/openmeter/openmeter/billing"
billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter"
billingservice "github.com/openmeterio/openmeter/openmeter/billing/service"
Expand Down Expand Up @@ -41,7 +40,6 @@ func BillingService(
logger *slog.Logger,
db *entdb.Client,
appService app.Service,
appStripeService appstripe.Service,
billingAdapter billing.Adapter,
billingConfig config.BillingConfiguration,
customerService customer.Service,
Expand Down
56 changes: 29 additions & 27 deletions cmd/billing-worker/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

142 changes: 142 additions & 0 deletions cmd/jobs/billing/advance/advance.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package advance

import (
"context"
"fmt"
"log/slog"

"github.com/spf13/cobra"

"github.com/openmeterio/openmeter/app/common"
"github.com/openmeterio/openmeter/cmd/jobs/internal"
"github.com/openmeterio/openmeter/openmeter/billing"
"github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/pkg/framework/entutils/entdriver"
"github.com/openmeterio/openmeter/pkg/framework/pgdriver"
)

var namespace string
Expand All @@ -24,6 +30,142 @@ func init() {
Cmd.PersistentFlags().StringVar(&namespace, "namespace", "", "namespace the operation should be performed")
}

type autoAdvancer struct {
*billingworkerautoadvance.AutoAdvancer

Shutdown func()
}

func NewAutoAdvancer(ctx context.Context, conf appconfig.Configuration, logger *slog.Logger) (*autoAdvancer, error) {
commonMetadata := common.NewMetadata(conf, "0.0.0", "billing-advancer")

// We use a noop meter provider as we don't want to monitor cronjobs (for now)
meterProvider := sdkmetric.NewMeterProvider()
meter := meterProvider.Meter("billing-advancer")

// Initialize Postgres driver
postgresDriver, err := pgdriver.NewPostgresDriver(ctx, conf.Postgres.URL)
if err != nil {
return nil, fmt.Errorf("failed to initialize postgres driver: %w", err)
}

// Initialize Ent driver
entPostgresDriver := entdriver.NewEntPostgresDriver(postgresDriver.DB())

meterRepository := common.NewInMemoryRepository(conf.Meters)

clickhouseConn, err := common.NewClickHouse(conf.Aggregation.ClickHouse)
if err != nil {
return nil, fmt.Errorf("failed to initialize clickhouse connection: %w", err)
}

streamingConnector, err := common.NewStreamingConnector(ctx, conf.Aggregation, clickhouseConn, meterRepository, logger)
if err != nil {
return nil, fmt.Errorf("failed to initialize streaming connection: %w", err)
}

brokerOptions := common.NewBrokerConfiguration(conf.Ingest.Kafka.KafkaConfiguration, conf.Telemetry.Log, commonMetadata, logger, meter)

adminClient, err := common.NewKafkaAdminClient(conf.Ingest.Kafka.KafkaConfiguration)
if err != nil {
return nil, fmt.Errorf("failed to initialize kafka admin client: %w", err)
}

kafkaTopicProvisionerConfig := common.NewKafkaTopicProvisionerConfig(adminClient, logger, meter, conf.Ingest.Kafka.TopicProvisionerConfig)

topicProvisioner, err := common.NewKafkaTopicProvisioner(kafkaTopicProvisionerConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize kafka topic provisioner: %w", err)
}

publisher, serverPublisherShutdown, err := common.NewServerPublisher(ctx, kafka.PublisherOptions{
Broker: brokerOptions,
ProvisionTopics: common.ServerProvisionTopics(conf.Events),
TopicProvisioner: topicProvisioner,
}, logger)
if err != nil {
return nil, fmt.Errorf("failed to initialize server publisher: %w", err)
}

ebPublisher, err := common.NewEventBusPublisher(publisher, conf.Events, logger)
if err != nil {
return nil, fmt.Errorf("failed to initialize event bus publisher: %w", err)
}

entitlementRegistry := registrybuilder.GetEntitlementRegistry(registrybuilder.EntitlementOptions{
DatabaseClient: entPostgresDriver.Client(),
StreamingConnector: streamingConnector,
Logger: logger,
MeterRepository: meterRepository,
Publisher: ebPublisher,
})

customerService, err := common.NewCustomerService(logger, entPostgresDriver.Client(), entitlementRegistry)
if err != nil {
return nil, fmt.Errorf("failed to initialize customer service: %w", err)
}

secretService, err := common.NewUnsafeSecretService(logger, entPostgresDriver.Client())
if err != nil {
return nil, fmt.Errorf("failed to initialize secret service: %w", err)
}

appService, err := common.NewAppService(logger, entPostgresDriver.Client(), conf.Apps)
if err != nil {
return nil, fmt.Errorf("failed to initialize app service: %w", err)
}

namespaceManager, err := common.NewNamespaceManager(nil, conf.Namespace)
if err != nil {
return nil, fmt.Errorf("failed to initialize namespace manager: %w", err)
}

billingAdapter, err := billingadapter.New(billingadapter.Config{
Client: entPostgresDriver.Client(),
Logger: logger,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize billing adapter: %w", err)
}

billingService, err := billingservice.New(billingservice.Config{
Adapter: billingAdapter,
CustomerService: customerService,
AppService: appService,
Logger: logger,
FeatureService: entitlementRegistry.Feature,
MeterRepo: meterRepository,
StreamingConnector: streamingConnector,
Publisher: ebPublisher,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize billing service: %w", err)
}

_, err = common.NewAppStripeService(logger, entPostgresDriver.Client(), conf.Apps, appService, customerService, secretService, billingService)
if err != nil {
return nil, fmt.Errorf("failed to initialize stripe app service: %w", err)
}

_, err = common.NewAppSandboxProvisioner(ctx, logger, conf.Apps, appService, namespaceManager, billingService)
if err != nil {
return nil, fmt.Errorf("failed to initialize sandbox app provisioner: %w", err)
}

a, err := billingworkerautoadvance.NewAdvancer(billingworkerautoadvance.Config{
BillingService: billingService,
Logger: logger,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize billing auto-advancer: %w", err)
}

return &autoAdvancer{
AutoAdvancer: a,
Shutdown: serverPublisherShutdown,
}, nil
}

var ListCmd = func() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Expand Down
20 changes: 10 additions & 10 deletions cmd/server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4157137

Please sign in to comment.