diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 078beb8b2ba..397354c1b49 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -96,15 +96,17 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { PropertyMapper map = PropertyMapper.get(); Listener properties = this.properties.getListener(); map.from(properties::getAckMode).whenNonNull().to(container::setAckMode); + map.from(properties::getClientId).whenNonNull().to(container::setClientId); map.from(properties::getAckCount).whenNonNull().to(container::setAckCount); map.from(properties::getAckTime).whenNonNull().as(Duration::toMillis) .to(container::setAckTime); map.from(properties::getPollTimeout).whenNonNull().as(Duration::toMillis) .to(container::setPollTimeout); - map.from(properties::getClientId).whenNonNull().to(container::setClientId); - map.from(properties::getIdleEventInterval).whenNonNull().to(container::setIdleEventInterval); - map.from(properties::getMonitorInterval).whenNonNull().to(container::setMonitorInterval); map.from(properties::getNoPollThreshold).whenNonNull().to(container::setNoPollThreshold); + map.from(properties::getIdleEventInterval).whenNonNull().as(Duration::toMillis) + .to(container::setIdleEventInterval); + map.from(properties::getMonitorInterval).whenNonNull().as(Duration::getSeconds) + .as(Number::intValue).to(container::setMonitorInterval); map.from(properties::getLogContainerConfig).whenNonNull().to(container::setLogContainerConfig); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 54466b6f9dc..74524ba3018 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.IOException; import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -32,6 +33,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.bind.convert.DefaultDurationUnit; import org.springframework.core.io.Resource; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; @@ -797,6 +799,11 @@ public class KafkaProperties { */ private AckMode ackMode; + /** + * Prefix for the listener's consumer client.id property. + */ + private String clientId; + /** * Number of threads to run in the listener containers. */ @@ -807,6 +814,11 @@ public class KafkaProperties { */ private Duration pollTimeout; + /** + * Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive. + */ + private Float noPollThreshold; + /** * Number of records between offset commits when ackMode is "COUNT" or * "COUNT_TIME". @@ -819,27 +831,19 @@ public class KafkaProperties { private Duration ackTime; /** - * Prefix for the listener's consumer client.id property. + * Time between publishing idle consumer events (no data received). */ - private String clientId; + private Duration idleEventInterval; /** - * Interval (ms) between publishing idle consumer events (no data received). + * Time between checks for non-responsive consumers. If a duration suffix is not + * specified, seconds will be used. */ - private Long idleEventInterval; + @DefaultDurationUnit(ChronoUnit.SECONDS) + private Duration monitorInterval; /** - * Interval (seconds) between checks for non-responsive consumers. - */ - private Integer monitorInterval; - - /** - * Multiplier applied to pollTimeout to determine if a consumer is non-responsive. - */ - private Float noPollThreshold; - - /** - * When true, log the container configuration during initialization (INFO level). + * Whether to log the container configuration during initialization (INFO level). */ private Boolean logContainerConfig; @@ -859,6 +863,14 @@ public class KafkaProperties { this.ackMode = ackMode; } + public String getClientId() { + return this.clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + public Integer getConcurrency() { return this.concurrency; } @@ -875,6 +887,14 @@ public class KafkaProperties { this.pollTimeout = pollTimeout; } + public Float getNoPollThreshold() { + return this.noPollThreshold; + } + + public void setNoPollThreshold(Float noPollThreshold) { + this.noPollThreshold = noPollThreshold; + } + public Integer getAckCount() { return this.ackCount; } @@ -891,38 +911,22 @@ public class KafkaProperties { this.ackTime = ackTime; } - public String getClientId() { - return this.clientId; - } - - public void setClientId(String clientId) { - this.clientId = clientId; - } - - public Long getIdleEventInterval() { + public Duration getIdleEventInterval() { return this.idleEventInterval; } - public void setIdleEventInterval(Long idleEventInterval) { + public void setIdleEventInterval(Duration idleEventInterval) { this.idleEventInterval = idleEventInterval; } - public Integer getMonitorInterval() { + public Duration getMonitorInterval() { return this.monitorInterval; } - public void setMonitorInterval(Integer monitorInterval) { + public void setMonitorInterval(Duration monitorInterval) { this.monitorInterval = monitorInterval; } - public Float getNoPollThreshold() { - return this.noPollThreshold; - } - - public void setNoPollThreshold(Float noPollThreshold) { - this.noPollThreshold = noPollThreshold; - } - public Boolean getLogContainerConfig() { return this.logContainerConfig; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index c013be10eb5..802322c68e8 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -254,15 +254,15 @@ public class KafkaAutoConfigurationTests { this.contextRunner.withUserConfiguration(TestConfiguration.class) .withPropertyValues("spring.kafka.template.default-topic=testTopic", "spring.kafka.listener.ack-mode=MANUAL", + "spring.kafka.listener.client-id=client", "spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", - "spring.kafka.listener.type=batch", - "spring.kafka.listener.client-id=client", - "spring.kafka.listener.idle-event-interval=12345", - "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.no-poll-threshold=2.5", + "spring.kafka.listener.type=batch", + "spring.kafka.listener.idle-event-interval=1s", + "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", "spring.kafka.jaas.enabled=true", "spring.kafka.producer.transaction-id-prefix=foo", @@ -290,6 +290,8 @@ public class KafkaAutoConfigurationTests { .isEqualTo(consumerFactory); assertThat(dfa.getPropertyValue("containerProperties.ackMode")) .isEqualTo(AckMode.MANUAL); + assertThat(dfa.getPropertyValue("containerProperties.clientId")) + .isEqualTo("client"); assertThat(dfa.getPropertyValue("containerProperties.ackCount")) .isEqualTo(123); assertThat(dfa.getPropertyValue("containerProperties.ackTime")) @@ -297,14 +299,12 @@ public class KafkaAutoConfigurationTests { assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) .isEqualTo(2000L); - assertThat(dfa.getPropertyValue("containerProperties.clientId")) - .isEqualTo("client"); - assertThat(dfa.getPropertyValue("containerProperties.idleEventInterval")) - .isEqualTo(12345L); - assertThat(dfa.getPropertyValue("containerProperties.monitorInterval")) - .isEqualTo(45); assertThat(dfa.getPropertyValue("containerProperties.noPollThreshold")) .isEqualTo(2.5f); + assertThat(dfa.getPropertyValue("containerProperties.idleEventInterval")) + .isEqualTo(1000L); + assertThat(dfa.getPropertyValue("containerProperties.monitorInterval")) + .isEqualTo(45); assertThat(dfa.getPropertyValue("containerProperties.logContainerConfig")) .isEqualTo(Boolean.TRUE); assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true); diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index f5fed9d603c..50a96ca7921 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -989,10 +989,10 @@ content into your application. Rather, pick only the properties that you need. spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME". spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property. spring.kafka.listener.concurrency= # Number of threads to run in the listener containers. - spring.kafka.listener.idle-event-interval= # Interval (ms) between publishing idle consumer events (no data received). - spring.kafka.listener.log-container-config= # When true, log the container configuration during initialization (INFO level). - spring.kafka.listener.monitor-interval= # Interval (seconds) between checks for non-responsive consumers. - spring.kafka.listener.no-poll-threshold= # Multiplier applied to pollTimeout to determine if a consumer is non-responsive. + spring.kafka.listener.idle-event-interval= # Time between publishing idle consumer events (no data received). + spring.kafka.listener.log-container-config= # Whether to log the container configuration during initialization (INFO level). + spring.kafka.listener.monitor-interval= # Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used. + spring.kafka.listener.no-poll-threshold= # Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive. spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer. spring.kafka.listener.type=single # Listener type. spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.