Skip to content

Commit

Permalink
refactor(config): move config check to s3stream (#2327)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Feb 21, 2025
1 parent 7da0d6b commit 7259ec9
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 17 deletions.
17 changes: 0 additions & 17 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -761,23 +761,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}
}

// configuration sanity checks
val memoryType = if (s3StreamAllocatorPolicy.isDirect) "Direct buffer" else "Heap buffer"
val memoryLimit = if (s3StreamAllocatorPolicy.isDirect) {
PlatformDependent.maxDirectMemory()
} else {
Runtime.getRuntime.maxMemory()
}
if (s3BlockCacheSize > memoryLimit) {
throw new ConfigException(s"${AutoMQConfig.S3_BLOCK_CACHE_SIZE_CONFIG} of ${s3BlockCacheSize} exceeds ${memoryType} limit of ${memoryLimit}")
}
if (s3WALCacheSize > memoryLimit) {
throw new ConfigException(s"${AutoMQConfig.S3_WAL_CACHE_SIZE_CONFIG} of ${s3WALCacheSize} exceeds ${memoryType} limit of ${memoryLimit}")
}
if (s3WALUploadThreshold > s3WALCacheSize) {
throw new ConfigException(s"${AutoMQConfig.S3_WAL_UPLOAD_THRESHOLD_CONFIG} of ${s3WALUploadThreshold} exceeds ${AutoMQConfig.S3_WAL_CACHE_SIZE_CONFIG} of ${s3WALCacheSize}")
}

(s3WALCacheSize, s3BlockCacheSize, s3WALUploadThreshold)
}

Expand Down
29 changes: 29 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/ConfigValidator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3;

import io.netty.util.internal.PlatformDependent;

public class ConfigValidator {

public static void validate(Config config) {
long memoryLimit = ByteBufAlloc.getPolicy().isDirect() ? PlatformDependent.maxDirectMemory() : Runtime.getRuntime().maxMemory();
long memoryRequired = config.blockCacheSize() + config.walCacheSize();
if (memoryRequired > memoryLimit) {
throw new IllegalArgumentException(String.format("blockCacheSize + walCacheSize size %s exceeds %s limit of %s", memoryRequired, ByteBufAlloc.getPolicy(), memoryLimit));
}
if (config.walUploadThreshold() > config.walCacheSize()) {
throw new IllegalArgumentException(String.format("walUploadThreshold %s exceeds walCacheSize %s", config.walUploadThreshold(), config.walCacheSize()));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage
this.config = config;
this.networkInboundBucket = networkInboundBucket;
this.networkOutboundBucket = networkOutboundBucket;
ConfigValidator.validate(config);
startStreamObjectsCompactions();
}

Expand Down

0 comments on commit 7259ec9

Please sign in to comment.