gpt4 book ai didi

java - 如何向flink CEP数据流添加新事件?

转载 作者:行者123 更新时间:2023-12-02 10:55:31 26 4
gpt4 key购买 nike

我正在使用 flink 1.5.2 来解决 CEP 问题。

我的数据来自列表,其他一些进程会在系统运行时将新的事件对象添加到该列表中。它不是套接字或网络消息。我一直在阅读官方网站的示例。以下是我认为应该执行的步骤。

  1. 使用 env.fromCollection(list) 创建数据流;
  2. 定义 Pattern 模式
  3. 使用 CEP.pattern(data_stream, pattern) 获取 PatternStream
  4. 使用pattern_stream.select( ...implement select interface ...) 将复杂事件结果作为DataStream获取

但是我的输入流应该是无界的。我在 DataStream<> 对象中没有找到任何 add() 方法。我该如何实现这个目标?另外,我是否需要告诉 DataStream<> 何时清理过时的事件?

最佳答案

只有在使用预先固定的有界输入集时(例如编写测试或只是进行实验时),集合才适合作为 Flink 的输入源。如果您想要无限制的流,则需要选择不同的源,例如套接字或消息队列系统(如 Kafka)。

套接字很容易用于实验。在 Linux 和 MacOS 系统上您可以使用

nc -lk 9999

创建一个 Flink 可以绑定(bind)到端口 9999 的套接字,并且您提供给 nc (netcat) 作为输入的任何内容都将一次一行地流式传输到您的 Flink 作业中。 Netcat 也适用于 Windows,但未预安装。

但是,您不应该计划在生产中使用套接字,因为它们无法回滚(这对于在故障恢复期间使用 Flink 获得准确的结果至关重要)。

关于java - 如何向flink CEP数据流添加新事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51782767/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com