gpt4 book ai didi

Java Flink 外部源

转载 作者:太空宇宙 更新时间:2023-11-04 09:52:05 24 4
gpt4 key购买 nike

我想要一个并行的 Flink 源,它从内存中的阻塞队列中消耗数据。我的想法是让应用程序将元素推送到该队列中,然后 Flink 管道消耗并处理它们。

为此遵循的最佳模式是什么?我研究了一些 Flink 源实现(例如 Kafka、RabbitMQ 等),所有这些实现都在初始化源实例中所需的连接。我无法执行此操作(即从每个源实例中初始化队列),因为

  • 每个源实例都会创建自己的队列。
  • 需要从 Flink 外部引用队列才能将元素推送到其中。

目前,我已经提出了以下方案,但我觉得使用静态队列并不合适。

<强>1。每个 Flink 源实例从中获取其元素的队列。

public class TheQueue implements Serializable {

private static final Logger LOGGER = LoggerFactory.getLogger(TheQueue.class);

private transient static final BlockingQueue<Object> OBJECT_QUEUE = new LinkedBlockingQueue<>();

public static SerializableSupplier<Object> getObjectConsumer() {
return () -> {
return OBJECT_QUEUE.take();
}
};
}

<强>2。我的 Flink 管道摘录。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(10);
env.addSource(TestParallelSourceFunction.getInstance(TheQueue.getObjectConsumer()))

<强>3。 Flink源码函数。

public class TestParallelSourceFunction<T> extends RichParallelSourceFunction<T>{

private static final Logger LOGGER = LoggerFactory.getLogger(TestParallelSourceFunction.class);

private SerializableSupplier<T> supplier;

// initialisation code

@Override
public void run(final SourceContext<T> ctx) throws Exception {

LOGGER.info("Starting Flink source.");
isRunning = true;

while (isRunning) {
final T t = supplier.get();
if (t != null) {
ctx.collect(t);
}
}

LOGGER.info("Stopped Flink source.");
}

最佳答案

我认为,您对 Kafka 和 RabbitMQ 等消息队列系统及其在流应用程序中的作用的理解是有缺陷的。它们是存在于 Flink 之外的独立服务。 Flink 不会启动或配置它们,它只是打开连接来读取它们。

因此,我们的想法是启动一个 Kafka 集群,并为 Flink 作业和将元素推送到 Kafka 的任何应用程序提供必要的连接详细信息和主题名称。将元素推送到队列的应用程序通过 tcpip 与 Kafka 集群通信,Flink 也是如此。

关于Java Flink 外部源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54604545/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com