Skip to content

Commit

Permalink
MINOR: Deflake EligibleLeaderReplicasIntegrationTest (#18923)
Browse files Browse the repository at this point in the history
Make sure to give enough time for the partition ISR updates.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
CalvinConfluent authored Feb 20, 2025
1 parent c89fd2b commit 1eecd02
Showing 1 changed file with 21 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import scala.collection.Seq;
import scala.collection.mutable.HashMap;

import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -202,7 +203,7 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(String quorum) throws
}

void waitUntilOneMessageIsConsumed(Consumer consumer) {
kafka.utils.TestUtils.waitUntilTrue(
TestUtils.waitUntilTrue(
() -> {
try {
ConsumerRecords record = consumer.poll(Duration.ofMillis(100L));
Expand All @@ -212,7 +213,7 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) {
}
},
() -> "fail to consume messages",
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L
DEFAULT_MAX_WAIT_MS, 100L
);
}

Expand Down Expand Up @@ -417,30 +418,39 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr(String quorum) throws E
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize > 0 && elrSize == 0;
});
topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions().get(0);
assertEquals(0, topicPartitionInfo.lastKnownElr().size());
assertEquals(0, topicPartitionInfo.elr().size());
assertEquals(lastKnownLeader, topicPartitionInfo.leader().id());

TestUtils.waitUntilTrue(
() -> {
try {
TopicPartitionInfo partition = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions().get(0);
if (partition.leader() == null) return false;
return partition.lastKnownElr().isEmpty() && partition.elr().isEmpty() && partition.leader().id() == lastKnownLeader;
} catch (Exception e) {
return false;
}
},
() -> String.format("Partition metadata for %s is not correct", testTopicName),
DEFAULT_MAX_WAIT_MS, 100L
);
} finally {
restartDeadBrokers(false);
}
}

void waitForIsrAndElr(BiFunction<Integer, Integer, Boolean> isIsrAndElrSizeSatisfied) {
kafka.utils.TestUtils.waitUntilTrue(
TestUtils.waitUntilTrue(
() -> {
try {
TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName);
TopicPartitionInfo partition = topicDescription.partitions().get(0);
if (!isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size())) return false;
return isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size());
} catch (Exception e) {
return false;
}
return true;
},
() -> String.format("Partition metadata for %s is not propagated", testTopicName),
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
DEFAULT_MAX_WAIT_MS, 100L);
}
}

0 comments on commit 1eecd02

Please sign in to comment.