Add Pulsar 4.0.x smoke test
This commit adds a smoke test to verify basic functionality with the Pulsar 4.0.x client. See gh-43532
This commit is contained in:
parent
a25065e383
commit
f1c1291301
@ -0,0 +1,27 @@
|
||||
plugins {
|
||||
id "java"
|
||||
id "org.springframework.boot.docker-test"
|
||||
}
|
||||
|
||||
description = "Spring Boot Pulsar 4 smoke test"
|
||||
|
||||
configurations.all {
|
||||
resolutionStrategy.eachDependency {
|
||||
if (it.requested.group == 'org.apache.pulsar' &&
|
||||
!(it.requested.name.startsWith('pulsar-client-reactive'))) {
|
||||
it.useVersion '4.0.1'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
dockerTestImplementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-test"))
|
||||
dockerTestImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support-docker"))
|
||||
dockerTestImplementation(project(":spring-boot-project:spring-boot-testcontainers"))
|
||||
dockerTestImplementation("org.awaitility:awaitility")
|
||||
dockerTestImplementation("org.testcontainers:junit-jupiter")
|
||||
dockerTestImplementation("org.testcontainers:pulsar")
|
||||
|
||||
implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar"))
|
||||
implementation(project(":spring-boot-project:spring-boot-starters:spring-boot-starter-pulsar-reactive"))
|
||||
}
|
@ -0,0 +1,92 @@
|
||||
/*
|
||||
* Copyright 2012-2024 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package smoketest.pulsar;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.awaitility.Awaitility;
|
||||
import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.testcontainers.containers.PulsarContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.system.CapturedOutput;
|
||||
import org.springframework.boot.test.system.OutputCaptureExtension;
|
||||
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
|
||||
import org.springframework.boot.testsupport.container.TestImage;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@Testcontainers(disabledWithoutDocker = true)
|
||||
@ExtendWith(OutputCaptureExtension.class)
|
||||
class SamplePulsarApplicationTests {
|
||||
|
||||
@Container
|
||||
@ServiceConnection
|
||||
static final PulsarContainer pulsar = TestImage.container(PulsarContainer.class);
|
||||
|
||||
abstract class PulsarApplication {
|
||||
|
||||
private final String type;
|
||||
|
||||
PulsarApplication(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
@Test
|
||||
void appProducesAndConsumesMessages(CapturedOutput output) {
|
||||
List<String> expectedOutput = new ArrayList<>();
|
||||
IntStream.range(0, 10).forEachOrdered((i) -> {
|
||||
expectedOutput.add("++++++PRODUCE %s:(%s)------".formatted(this.type, i));
|
||||
expectedOutput.add("++++++CONSUME %s:(%s)------".formatted(this.type, i));
|
||||
});
|
||||
Awaitility.waitAtMost(Duration.ofSeconds(30))
|
||||
.untilAsserted(() -> assertThat(output).contains(expectedOutput));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Nested
|
||||
@SpringBootTest
|
||||
@ActiveProfiles("smoketest.pulsar.imperative")
|
||||
class ImperativePulsarApplication extends PulsarApplication {
|
||||
|
||||
ImperativePulsarApplication() {
|
||||
super("IMPERATIVE");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Nested
|
||||
@SpringBootTest
|
||||
@ActiveProfiles("smoketest.pulsar.reactive")
|
||||
class ReactivePulsarApplication extends PulsarApplication {
|
||||
|
||||
ReactivePulsarApplication() {
|
||||
super("REACTIVE");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Copyright 2012-2024 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package smoketest.pulsar;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.pulsar.annotation.PulsarListener;
|
||||
import org.springframework.pulsar.core.PulsarTemplate;
|
||||
import org.springframework.pulsar.core.PulsarTopic;
|
||||
import org.springframework.pulsar.core.PulsarTopicBuilder;
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@Profile("smoketest.pulsar.imperative")
|
||||
class ImperativeAppConfig {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(ImperativeAppConfig.class);
|
||||
|
||||
private static final String TOPIC = "pulsar-smoke-test-topic";
|
||||
|
||||
@Bean
|
||||
PulsarTopic pulsarTestTopic() {
|
||||
return new PulsarTopicBuilder().name(TOPIC).numberOfPartitions(1).build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
ApplicationRunner sendMessagesToPulsarTopic(PulsarTemplate<SampleMessage> template) {
|
||||
return (args) -> {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
template.send(TOPIC, new SampleMessage(i, "message:" + i));
|
||||
logger.info("++++++PRODUCE IMPERATIVE:(" + i + ")------");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@PulsarListener(topics = TOPIC)
|
||||
void consumeMessagesFromPulsarTopic(SampleMessage msg) {
|
||||
logger.info("++++++CONSUME IMPERATIVE:(" + msg.id() + ")------");
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright 2012-2024 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package smoketest.pulsar;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.pulsar.reactive.client.api.MessageSpec;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.pulsar.core.PulsarTopic;
|
||||
import org.springframework.pulsar.core.PulsarTopicBuilder;
|
||||
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
|
||||
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@Profile("smoketest.pulsar.reactive")
|
||||
class ReactiveAppConfig {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(ReactiveAppConfig.class);
|
||||
|
||||
private static final String TOPIC = "pulsar-reactive-smoke-test-topic";
|
||||
|
||||
@Bean
|
||||
PulsarTopic pulsarTestTopic() {
|
||||
return new PulsarTopicBuilder().name(TOPIC).numberOfPartitions(1).build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
ApplicationRunner sendMessagesToPulsarTopic(ReactivePulsarTemplate<SampleMessage> template) {
|
||||
return (args) -> Flux.range(0, 10)
|
||||
.map((i) -> new SampleMessage(i, "message:" + i))
|
||||
.map(MessageSpec::of)
|
||||
.as((msgs) -> template.send(TOPIC, msgs))
|
||||
.doOnNext((sendResult) -> logger
|
||||
.info("++++++PRODUCE REACTIVE:(" + sendResult.getMessageSpec().getValue().id() + ")------"))
|
||||
.subscribe();
|
||||
}
|
||||
|
||||
@ReactivePulsarListener(topics = TOPIC)
|
||||
Mono<Void> consumeMessagesFromPulsarTopic(SampleMessage msg) {
|
||||
logger.info("++++++CONSUME REACTIVE:(" + msg.id() + ")------");
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
/*
|
||||
* Copyright 2012-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package smoketest.pulsar;
|
||||
|
||||
record SampleMessage(Integer id, String content) {
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright 2012-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package smoketest.pulsar;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
@SpringBootApplication
|
||||
public class SamplePulsarApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(SamplePulsarApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1 @@
|
||||
spring.pulsar.consumer.subscription.initial-position=earliest
|
Loading…
x
Reference in New Issue
Block a user