gpt4 book ai didi

java - 将 KafkaProducer 集成到 Jersey REST 中

转载 作者:太空宇宙 更新时间:2023-11-04 13:26:37 26 4
gpt4 key购买 nike

我有一个示例 Hello World REST API,并且编写了一些简单的 Kafka 生产者代码。

现在,我希望我的 Kafka Producer 在每次调用 REST API 时向主题发送消息,即当有人输入 www.mywebpage.com/entry-api/test 时,我希望我的生产者向主题发送消息。

最简单的方法是每次创建新的生产者实例并发送一条消息,但这看起来很愚蠢。所以我想必须有一种方法可以将这个 Kafka 生产者实例注入(inject) Jersey 资源,以便使用服务器创建生产者并关闭一个服务器。

我读到可以通过依赖注入(inject)来做到这一点,但我迷失了我实际上应该做什么以及它是如何工作的。

我非常感谢一些指导方针,我应该如何更改我的代码,以便 Kafka 生产者的注入(inject)能够工作。

代码如下。

这是我的主要类(class):

package org.apache.kafka;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;

public class MainApp {

public static void main(String[] args) throws Exception {

ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");

Server jettyServer = new Server(9090);
jettyServer.setHandler(context);

ServletHolder jerseyServlet = context.addServlet(
org.glassfish.jersey.servlet.ServletContainer.class, "/*");
jerseyServlet.setInitOrder(0);

// Tells the Jersey Servlet which REST service/class to load.
jerseyServlet.setInitParameter(
"jersey.config.server.provider.classnames",
EntryApi.class.getCanonicalName());

try {
jettyServer.start();
jettyServer.join();
} finally {
jettyServer.destroy();
}

}

}

这是资源:

package org.apache.kafka;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/entry-api")
public class EntryApi {

@GET
@Path("test")
@Produces(MediaType.TEXT_PLAIN)
public void test() {
//HERE I WANT TO SEND MESSAGE WITH KAFKA PRODUCER
}
}

我还写了Kafka Producer:

package org.apache.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

public class SampleProducer {

private KafkaProducer<String, String> producer;

public SampleProducer() {
Properties prodProp = new Properties();

prodProp.put("bootstrap.servers", "sandbox.hortonworks.com:6667");
prodProp.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prodProp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prodProp.put("request.required.acks", "1");

this.producer = new KafkaProducer<String, String>(prodProp);
}

public void sendMsgToTopic(String inMessage) {

StringBuilder msg = new StringBuilder(inMessage);

ProducerRecord<String, String> msgData = new ProducerRecord<String, String>("testTopic", msg.toString());
Future<RecordMetadata> rs = producer.send(msgData, new Callback() {
public void onCompletion(RecordMetadata rM, Exception e) {
System.out.println("Received ack for partition=" + rM.partition() + " offset=" + rM.offset());
}
});

try {
RecordMetadata rM = (RecordMetadata) rs.get();
msg.append(" partition=" + rM.partition() + " offset=" + rM.offset());
} catch (Exception e) {
System.out.println(e);
}
}

public void closeProducer() {
this.producer.close();
}
}

编辑:

好的,我用 Jersey 2.x 在我的沙箱上运行了它(感谢 peeskillet)。

事情是,我必须让它与 Jersey 1.18 一起工作(这是要求:()

再次感谢peeskillet,我已经可以使用这样的东西了:

    @Provider
public class KafkaProducerProvider implements InjectableProvider<Inject, Type>{

@Override
public ComponentScope getScope() {
return ComponentScope.Singleton;
}

@Override
public Injectable<SampleProducer> getInjectable(ComponentContext ic, Inject a, Type c) {
if (c.equals(SampleProducer.class)) {
return new Injectable<SampleProducer>() {

@Override
public SampleProducer getValue() {

Properties prodProp = new Properties();
prodProp.put("bootstrap.servers", "sandbox.hortonworks.com:6667");
prodProp.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prodProp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prodProp.put("request.required.acks", "1");

return new SampleProducer(prodProp);
}

};
}

最佳答案

经过一番尝试后,似乎关闭单个服务并不像我想象的那么容易。我以为我们可以用 @PreDestroy 注释一个方法,但我想这并不那么容易

注入(inject)没问题。这确实是您所需要的

public class InjectionFeature implements Feature {

@Override
public boolean configure(FeatureContext context) {

context.register(new AbstractBinder(){
@Override
protected void configure() {
bind(SampleProducer.class)
.to(SampleProducer.class).in(Singleton.class);
}
});
return true;
}
}

您还可以使用相同的 configure 方法绑定(bind)任何其他您想要的服务。然后你只需要将功能添加到类中

jerseyServlet.setInitParameter(
"jersey.config.server.provider.classnames",
EntryApi.class.getCanonicalName() + ","
+ InjectionFeature.class.getCanonicalName());

您现在应该能够将其注入(inject)到您的资源类中

@Path("/entry-api")
public class EntryApi {

@Inject
private SampleProducer producer;

问题在于关闭,如果你想在关闭之前清理任何东西。我能找到的唯一方法是实现一个 ApplicationEventListener

public class ApplicationListener implements ApplicationEventListener {

@Inject
private ServiceLocator locator;

@Override
public void onEvent(ApplicationEvent ae) {
switch (ae.getType()) {
case DESTROY_FINISHED: {
SampleProducer producer = locator.getService(SampleProducer.class);
producer.destroy();
}
}
}

@Override
public RequestEventListener onRequest(RequestEvent re) {
return null;
}
}

在这里,您正在查找SampleProducer并自行关闭它。只需按照与上述功能相同的方式注册监听器即可。

另请参阅:

关于java - 将 KafkaProducer 集成到 Jersey REST 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32585032/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com