Polish "Add support for additional Kafka listener properties"

Closes gh-11502
This commit is contained in:
Stephane Nicoll 2018-01-05 14:35:04 +01:00
parent 6fcbf80b31
commit c4db22007c
4 changed files with 58 additions and 52 deletions

View File

@ -96,15 +96,17 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
PropertyMapper map = PropertyMapper.get(); PropertyMapper map = PropertyMapper.get();
Listener properties = this.properties.getListener(); Listener properties = this.properties.getListener();
map.from(properties::getAckMode).whenNonNull().to(container::setAckMode); map.from(properties::getAckMode).whenNonNull().to(container::setAckMode);
map.from(properties::getClientId).whenNonNull().to(container::setClientId);
map.from(properties::getAckCount).whenNonNull().to(container::setAckCount); map.from(properties::getAckCount).whenNonNull().to(container::setAckCount);
map.from(properties::getAckTime).whenNonNull().as(Duration::toMillis) map.from(properties::getAckTime).whenNonNull().as(Duration::toMillis)
.to(container::setAckTime); .to(container::setAckTime);
map.from(properties::getPollTimeout).whenNonNull().as(Duration::toMillis) map.from(properties::getPollTimeout).whenNonNull().as(Duration::toMillis)
.to(container::setPollTimeout); .to(container::setPollTimeout);
map.from(properties::getClientId).whenNonNull().to(container::setClientId);
map.from(properties::getIdleEventInterval).whenNonNull().to(container::setIdleEventInterval);
map.from(properties::getMonitorInterval).whenNonNull().to(container::setMonitorInterval);
map.from(properties::getNoPollThreshold).whenNonNull().to(container::setNoPollThreshold); map.from(properties::getNoPollThreshold).whenNonNull().to(container::setNoPollThreshold);
map.from(properties::getIdleEventInterval).whenNonNull().as(Duration::toMillis)
.to(container::setIdleEventInterval);
map.from(properties::getMonitorInterval).whenNonNull().as(Duration::getSeconds)
.as(Number::intValue).to(container::setMonitorInterval);
map.from(properties::getLogContainerConfig).whenNonNull().to(container::setLogContainerConfig); map.from(properties::getLogContainerConfig).whenNonNull().to(container::setLogContainerConfig);
} }

View File

@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.kafka;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -32,6 +33,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.bind.convert.DefaultDurationUnit;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
@ -797,6 +799,11 @@ public class KafkaProperties {
*/ */
private AckMode ackMode; private AckMode ackMode;
/**
* Prefix for the listener's consumer client.id property.
*/
private String clientId;
/** /**
* Number of threads to run in the listener containers. * Number of threads to run in the listener containers.
*/ */
@ -807,6 +814,11 @@ public class KafkaProperties {
*/ */
private Duration pollTimeout; private Duration pollTimeout;
/**
* Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive.
*/
private Float noPollThreshold;
/** /**
* Number of records between offset commits when ackMode is "COUNT" or * Number of records between offset commits when ackMode is "COUNT" or
* "COUNT_TIME". * "COUNT_TIME".
@ -819,27 +831,19 @@ public class KafkaProperties {
private Duration ackTime; private Duration ackTime;
/** /**
* Prefix for the listener's consumer client.id property. * Time between publishing idle consumer events (no data received).
*/ */
private String clientId; private Duration idleEventInterval;
/** /**
* Interval (ms) between publishing idle consumer events (no data received). * Time between checks for non-responsive consumers. If a duration suffix is not
* specified, seconds will be used.
*/ */
private Long idleEventInterval; @DefaultDurationUnit(ChronoUnit.SECONDS)
private Duration monitorInterval;
/** /**
* Interval (seconds) between checks for non-responsive consumers. * Whether to log the container configuration during initialization (INFO level).
*/
private Integer monitorInterval;
/**
* Multiplier applied to pollTimeout to determine if a consumer is non-responsive.
*/
private Float noPollThreshold;
/**
* When true, log the container configuration during initialization (INFO level).
*/ */
private Boolean logContainerConfig; private Boolean logContainerConfig;
@ -859,6 +863,14 @@ public class KafkaProperties {
this.ackMode = ackMode; this.ackMode = ackMode;
} }
public String getClientId() {
return this.clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public Integer getConcurrency() { public Integer getConcurrency() {
return this.concurrency; return this.concurrency;
} }
@ -875,6 +887,14 @@ public class KafkaProperties {
this.pollTimeout = pollTimeout; this.pollTimeout = pollTimeout;
} }
public Float getNoPollThreshold() {
return this.noPollThreshold;
}
public void setNoPollThreshold(Float noPollThreshold) {
this.noPollThreshold = noPollThreshold;
}
public Integer getAckCount() { public Integer getAckCount() {
return this.ackCount; return this.ackCount;
} }
@ -891,38 +911,22 @@ public class KafkaProperties {
this.ackTime = ackTime; this.ackTime = ackTime;
} }
public String getClientId() { public Duration getIdleEventInterval() {
return this.clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public Long getIdleEventInterval() {
return this.idleEventInterval; return this.idleEventInterval;
} }
public void setIdleEventInterval(Long idleEventInterval) { public void setIdleEventInterval(Duration idleEventInterval) {
this.idleEventInterval = idleEventInterval; this.idleEventInterval = idleEventInterval;
} }
public Integer getMonitorInterval() { public Duration getMonitorInterval() {
return this.monitorInterval; return this.monitorInterval;
} }
public void setMonitorInterval(Integer monitorInterval) { public void setMonitorInterval(Duration monitorInterval) {
this.monitorInterval = monitorInterval; this.monitorInterval = monitorInterval;
} }
public Float getNoPollThreshold() {
return this.noPollThreshold;
}
public void setNoPollThreshold(Float noPollThreshold) {
this.noPollThreshold = noPollThreshold;
}
public Boolean getLogContainerConfig() { public Boolean getLogContainerConfig() {
return this.logContainerConfig; return this.logContainerConfig;
} }

View File

@ -254,15 +254,15 @@ public class KafkaAutoConfigurationTests {
this.contextRunner.withUserConfiguration(TestConfiguration.class) this.contextRunner.withUserConfiguration(TestConfiguration.class)
.withPropertyValues("spring.kafka.template.default-topic=testTopic", .withPropertyValues("spring.kafka.template.default-topic=testTopic",
"spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.ack-mode=MANUAL",
"spring.kafka.listener.client-id=client",
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-count=123",
"spring.kafka.listener.ack-time=456", "spring.kafka.listener.ack-time=456",
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.concurrency=3",
"spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.poll-timeout=2000",
"spring.kafka.listener.type=batch",
"spring.kafka.listener.client-id=client",
"spring.kafka.listener.idle-event-interval=12345",
"spring.kafka.listener.monitor-interval=45",
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.no-poll-threshold=2.5",
"spring.kafka.listener.type=batch",
"spring.kafka.listener.idle-event-interval=1s",
"spring.kafka.listener.monitor-interval=45",
"spring.kafka.listener.log-container-config=true", "spring.kafka.listener.log-container-config=true",
"spring.kafka.jaas.enabled=true", "spring.kafka.jaas.enabled=true",
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.producer.transaction-id-prefix=foo",
@ -290,6 +290,8 @@ public class KafkaAutoConfigurationTests {
.isEqualTo(consumerFactory); .isEqualTo(consumerFactory);
assertThat(dfa.getPropertyValue("containerProperties.ackMode")) assertThat(dfa.getPropertyValue("containerProperties.ackMode"))
.isEqualTo(AckMode.MANUAL); .isEqualTo(AckMode.MANUAL);
assertThat(dfa.getPropertyValue("containerProperties.clientId"))
.isEqualTo("client");
assertThat(dfa.getPropertyValue("containerProperties.ackCount")) assertThat(dfa.getPropertyValue("containerProperties.ackCount"))
.isEqualTo(123); .isEqualTo(123);
assertThat(dfa.getPropertyValue("containerProperties.ackTime")) assertThat(dfa.getPropertyValue("containerProperties.ackTime"))
@ -297,14 +299,12 @@ public class KafkaAutoConfigurationTests {
assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) assertThat(dfa.getPropertyValue("containerProperties.pollTimeout"))
.isEqualTo(2000L); .isEqualTo(2000L);
assertThat(dfa.getPropertyValue("containerProperties.clientId"))
.isEqualTo("client");
assertThat(dfa.getPropertyValue("containerProperties.idleEventInterval"))
.isEqualTo(12345L);
assertThat(dfa.getPropertyValue("containerProperties.monitorInterval"))
.isEqualTo(45);
assertThat(dfa.getPropertyValue("containerProperties.noPollThreshold")) assertThat(dfa.getPropertyValue("containerProperties.noPollThreshold"))
.isEqualTo(2.5f); .isEqualTo(2.5f);
assertThat(dfa.getPropertyValue("containerProperties.idleEventInterval"))
.isEqualTo(1000L);
assertThat(dfa.getPropertyValue("containerProperties.monitorInterval"))
.isEqualTo(45);
assertThat(dfa.getPropertyValue("containerProperties.logContainerConfig")) assertThat(dfa.getPropertyValue("containerProperties.logContainerConfig"))
.isEqualTo(Boolean.TRUE); .isEqualTo(Boolean.TRUE);
assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true); assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true);

View File

@ -989,10 +989,10 @@ content into your application. Rather, pick only the properties that you need.
spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME". spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property. spring.kafka.listener.client-id= # Prefix for the listener's consumer client.id property.
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers. spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
spring.kafka.listener.idle-event-interval= # Interval (ms) between publishing idle consumer events (no data received). spring.kafka.listener.idle-event-interval= # Time between publishing idle consumer events (no data received).
spring.kafka.listener.log-container-config= # When true, log the container configuration during initialization (INFO level). spring.kafka.listener.log-container-config= # Whether to log the container configuration during initialization (INFO level).
spring.kafka.listener.monitor-interval= # Interval (seconds) between checks for non-responsive consumers. spring.kafka.listener.monitor-interval= # Time between checks for non-responsive consumers. If a duration suffix is not specified, seconds will be used.
spring.kafka.listener.no-poll-threshold= # Multiplier applied to pollTimeout to determine if a consumer is non-responsive. spring.kafka.listener.no-poll-threshold= # Multiplier applied to "pollTimeout" to determine if a consumer is non-responsive.
spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer. spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
spring.kafka.listener.type=single # Listener type. spring.kafka.listener.type=single # Listener type.
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete. spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.