gpt4 book ai didi

java - 在嵌入式独立环境中测试 apache pulsar 功能

转载 作者:行者123 更新时间:2023-11-30 01:49:40 28 4
gpt4 key购买 nike

为了进行测试,我成功运行了嵌入式独立 pulsar 服务器和客户端。我还可以发送和接收消息。然而我实际上想(集成)测试函数(实现 org.apache.pulsar.functions.api.Function )。如何在嵌入式设置中注册函数?

package kic.data.stream.pulsar

import groovy.util.logging.Log
import org.apache.pulsar.PulsarStandalone
import org.apache.pulsar.PulsarStandaloneBuilder
import org.apache.pulsar.broker.PulsarService
import org.apache.pulsar.broker.ServiceConfiguration
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.ConsumerEventListener
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.SubscriptionType
import spock.lang.Specification

import java.util.concurrent.TimeUnit

@Log
class PulsarEmbeddedTest extends Specification {

static final String TOPIC = "hello";
static final int NUM_OF_MESSAGES = 100;
static PulsarStandalone standalone
static PulsarService pulsarService

def setupSpec() {
def configFile = new File(ClassLoader.getSystemResource("broker.conf").toURI()).getAbsolutePath()
def conf = new ServiceConfiguration(clusterName: "test-cluster", zookeeperServers: "localhost:2184")
log.info("${PulsarStandalone.properties}")
standalone = PulsarStandaloneBuilder.instance()
.withConfig(conf)
.withNoStreamStorage(true)
.build()
standalone.configFile = configFile
standalone.start()
pulsarService = new PulsarService(conf)
}

def test() {
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarService.brokerServiceUrl)
.build()

Producer<String> producer = client.newProducer(Schema.STRING)
.topic(TOPIC)
.enableBatching(false)
.create()

Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(TOPIC)
//.subscriptionInitialPosition()
.subscriptionName("test-subs-1")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.messageListener(Mesa)
.subscribe()



for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
producer.send("Hello_" + i)
}


Message<String> message
for (int i = 1; i <= NUM_OF_MESSAGES; ++i) {
// This calls blocks until a message is available.
message = consumer.receive(1, TimeUnit.SECONDS)
//log.info("Message received : ${message.getValue()}")
println("Message received : ${message.messageId}:${message.value}")

consumer.acknowledge(message)
}

producer.close()
consumer.close()
client.close()

expect:
1==1

}

def cleanupSpec() {
standalone.close()
}

}

最佳答案

您应该能够通过 Pulsar Admin API 创建 Pulsar 函数,就像创建普通 Pulsar 集群一样

PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();

FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName("exclamation");
functionConfig.setInputs(Collections.singleton("input"));
functionConfig.setClassName(ExclamationFunction.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setOutput("output");
functionConfig.setJar("/tmp/my-jar.jar")

pulsarAdmin.functions().createFunction(functionConfig, functionConfig.getJar());

Apache Pulsar 项目中也有相当多的集成测试用于测试 Pulsar 功能。有基于 docker 的真正集成测试,也有单进程“集成”测试。以下是一个单进程“集成”测试的示例,您可以引用:

https://github.com/apache/pulsar/blob/master/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java

关于java - 在嵌入式独立环境中测试 apache pulsar 功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56515083/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com