- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试设置一个非常基本的flink作业。当我尝试运行时,出现以下错误:
Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1535)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:53)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.test.flink.jobs.TestJobRunnable$.run(TestJob.scala:223)
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
streamExecutionEnvironment.execute("Test Job")
print()
调用时,错误消失了:
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
messageStream.print()
streamExecutionEnvironment.execute("Test Job")
print()
为什么可以解决此问题感到困惑。在引入接收器之前,流拓扑不会处理其任何运算符的想法吗?
print()
在这里充当接收器吗?任何帮助,将不胜感激。谢谢。
最佳答案
在编程语言理论中,惰性求值或按需调用是一种评估策略,它可以延迟对表达式的求值,直到需要其值为止,并且还避免了重复求值。懒惰评估的反义词是急切评估,有时也称为严格评估。
惰性评估的好处包括:
Flink programs are regular programs that implement transformations on distributed collections (e.g., filtering, mapping, updating state, joining, grouping, defining windows, aggregating). Collections are initially created from sources (e.g., by reading from files, Kafka topics, or from local, in-memory collections). Results are returned via sinks, which may, for example, write the data to (distributed) files, or to standard output (for example, the command line terminal).
关于scala - Flink : No operators defined in streaming topology.无法执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54977290/
我正在使用 Groovy 进行一个项目,我想采用一个员工数组,这样在数组中没有经理跟随他们的下属。原因是我需要将人员添加到数据库中,我不希望分两次完成。 所以,我基本上有: 12
要缓存的数据: 100 Gb 数据 大小为 500-5000 字节的对象 平均每分钟更新/插入 1000 个对象(峰值 5000) 需要对生产和测试中的 Coherence 拓扑提出建议(与备份一起分
在所有边都处于错误方向的图上反转拓扑排序的结果是否会产生有效的拓扑顺序,就好像边在排序之前被反转一样? a -> b a -> c b -> d c -> d 可以给出 a b c d 的拓扑排序。反
使用JTS,如何找到多边形边界上距外部点最近的点? 最佳答案 查看 DistanceOp 。它返回一个坐标数组: 坐标[] pts = DistanceOp.closestPoints(poly, O
我正在尝试使用充当 Kafka 消费者的 Storm 喷口将我的数据从 Kafka 主题发送到 HBase,并将数据发送到 HBase 我在 Storm 拓扑中面临异常.... java.lang.R
我已经配置了我的机器 zookeeper、nimbus、supervisor 正常运行并且我的拓扑在 LocalCluster 中工作 LocalCluster cluster = new Local
我正在构建一个 REST API,但每次加载我的网站时,我都会收到 MongoError:拓扑已损坏。有人可以帮我解决这个问题吗?我感觉异步运行有问题。 const client = new Mong
我使用一些带有 zb 堆栈的 xbee (s2) 模块进行网状网络评估。因此,必须创建多跳环境。问题是,固件自己处理关联,并且没有像 api 提供的那样更深入地了解堆栈。为了强制数据的路径,而不干扰路
我有一个在 Node.js 中使用 Restify 和 Mongoose 构建的 REST 服务,以及一个包含大约 30.000 个常规大小文档的集合的 mongoDB。我的 Node 服务通过 pm
当我尝试创建时出现此错误: Error (E_UNKNOWN) :: Encountered an unexpected error MongoError: topology was destroye
我正在使用 storm jar 类将拓扑提交给 `nimbus'。它在本地工作正常但在远程集群上它说它无法加载主类。下面是错误 stderr: SLF4J: Class path contains m
当我尝试创建此错误时: Error (E_UNKNOWN) :: Encountered an unexpected error MongoError: topology was destroyed
我有一个内置在 node.js 中的 REST 服务,带有 Restify 和 Mongoose,还有一个 mongoDB,其中包含大约 30.000 个常规大小的文档。我的 Node 服务通过 pm
我正在寻找一种方法来设置“cassandra-topology.properties”的路径,以便 Cassandra 可以从给定路径获取此文件。有什么办法可以做到这一点吗? 谢谢,巴蒂亚 最佳答案
在尝试提交 Storm 拓扑时, ./storm jar /home/winoria1/Desktop/stormtopology.jar com.storm.StormTopology 我收到以下错
Apache Storm 如何设置新拓扑和存在一次的日志级别? 在java中我写道: import org.slf4j.Logger; import org.slf4j.LoggerFactory;
在同一台计算机上有两个节点的本地(测试)设置中(使用端口范围 47500..47501 的静态 IP 配置),“第二个”节点将不会加入集群;它发出 TcpDiscoveryJoinRequestMes
我是一名前端开发人员,试图在新的 Next 项目中拓展自己的视野,第一次学习 Node、Mongo 和 GraphQL 的服务器端。 Apollo 对我来说是最简单的入门方式,因为我已经在之前的项目中
我已将我的 MongoDB 数据库配置为单节点副本集。我可以通过 api 访问它(向它写入数据),也可以从 shell 访问它: rs0:PRIMARY> rs.status() { "set
我目前在 Windows 机器上使用 Netbeans 开发拓扑。当我以本地模式部署时: LocalCluster 集群 = new LocalCluster(); cluster.submitTop
我是一名优秀的程序员,十分优秀!