- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我的拓扑有问题。我尝试解释工作流程...我有一个每 2 分钟发出约 500k 元组的源,这些元组必须由 spout 读取并像单个对象一样精确处理一次(我认为是三叉戟中的一个批处理)。之后, bolt /函数/还有什么?...必须附加时间戳并将元组保存到 Redis 中。
我尝试使用一个函数实现 Trident 拓扑结构,该函数使用一个 Jedis 对象(Java 的 Redis 库)将所有元组保存到 Redis 中,但是当我部署时,我在这个对象上收到一个 NotSerializable 异常。
我的问题是。我怎样才能实现一个在 Redis 上写入这批元组的函数?在网上阅读我找不到任何从函数写入 Redis 的示例或任何使用 Trident 中的 State 对象的示例(可能我必须使用它......)
我的简单拓扑:
TridentTopology topology = new TridentTopology();
topology.newStream("myStream", new mySpout()).each(new Fields("field1", "field2"), new myFunction("redis_ip", "6379"));
提前致谢
最佳答案
(回复一般状态,因为与 Redis 相关的特定问题似乎在其他评论中已解决)
当我们牢记 Storm 从分布式(或“分区”)数据源(通过 Storm“spouts”)读取、并行处理许多节点上的数据流、选择性地执行对这些数据流进行计算(称为“聚合”)并将结果保存到分布式数据存储(称为“状态”)。聚合是一个非常广泛的术语,仅表示“计算内容”:例如,在 Storm 中计算流上的最小值被视为先前已知的最小值与当前在集群的某个节点中处理的新值的聚合。
考虑到聚合和分区的概念,我们可以看一下 Storm 中允许在状态中保存内容的两个主要原语:partitionPersist 和 persistentAggregate,第一个在每个集群节点级别运行,没有与其他分区的协调,感觉有点像通过 DAO 与 DB 对话,而第二个分区涉及“重新分区”元组(即在集群中重新分配它们,通常沿着一些 groupby 逻辑),做一些计算(一个“聚合”)在读取/保存一些东西到数据库之前,感觉有点像与 HashMap 而不是 DB 交谈(在这种情况下,Storm 将 DB 称为“MapState”,或者如果数据库中只有一个键,则称为“快照” map )。
还有一件事要记住,Storm 的exactly once 语义不是通过只处理每个元组一次来实现的:这太脆弱了,因为每个元组可能有多个读/写操作在我们的拓扑中定义的元组,出于可伸缩性原因,我们希望避免两阶段提交,并且在大规模情况下,网络分区变得更有可能。相反,Storm 通常会继续重放这些元组,直到他确定它们已被完全成功地处理至少一次。这与状态更新的重要关系是 Storm 为我们提供了允许幂等状态更新的原语 (OpaqueMap),以便这些重放不会破坏以前存储的数据。例如,如果我们对数字 [1,2,3,4,5] 进行求和,即使由于某些原因在“求和”操作中多次重放和处理,保存在 DB 中的结果始终是 15 transient 故障。 OpaqueMap 对用于在数据库中保存数据的格式有轻微影响。请注意,只有当我们告诉 Storm 这样做时,这些重播和不透明逻辑才会出现,但我们通常会这样做。
如果您有兴趣阅读更多内容,我在此处发布了 2 篇关于该主题的博客文章。
http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/
http://svendvanderveken.wordpress.com/2014/02/05/error-handling-in-storm-trident-topologies/
最后一件事:正如上面的回放内容所暗示的,Storm 本质上是一种非常异步的机制:我们通常有一些数据生产者在队列系统(例如 Kafka 或 0MQ)中发布事件,Storm 从那里。因此,按照问题中的建议从 Storm 中分配时间戳可能会或可能不会产生预期的效果:此时间戳将反射(reflect)“最新成功处理时间”,而不是数据摄取时间,当然它不会完全相同在重播元组的情况下。
关于redis - 在 Redis 上写入的 Trident 或 Storm 拓扑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21984201/
我们正在以伪模式执行 Storm 拓扑。 Storm 拓扑运行良好,能够连接 Storm UI (8080)。 但是Storm UI 没有显示正在运行的拓扑信息。 也重新启动了 Storm UI 进程
我们有一个相当简单的 Storm 拓扑,让人头疼。 我们的一个 bolt 可以发现它正在处理的数据是有效的,并且每件事都正常进行,或者它可以发现它是无效但可以修复的。在这种情况下,我们需要将其发送以进
我是 Storm 中 Trident 的新手。我对 TridentState 感到很头疼。据我了解,三叉戟维护每个批次的状态(即元数据)(批次中的所有元组是否都通过在数据库中维护事务 ID 来完全处理
我有以下情况: 有许多 bolt 计算不同的值 该值被发送到可视化 bolt 可视化 bolt 打开一个网络套接字并发送值以某种方式可视化 问题是,可视化 bolt 总是相同的,但它为可以作为其输入的
我正在使用 Kafka storm,kafka 向 storm 发送/发出 json 字符串,在 storm 中,我想根据 json 中的键/字段将负载分配给几个工作人员。怎么做?在我的例子中,它是
我需要使用 Storm 处理成批的元组。我的最后一个 bolt 必须等到拓扑接收到整个批次,然后才能进行一些处理。为避免混淆 - 对我来说,批处理是一组实时出现的 N 条消息,该术语不必与批处理 (H
我是 Storm 的新手..我遇到了以下错误 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannel
这是一个让我发疯的问题。我的本地 LAN 上运行着一台机器 Storm 实例。我目前正在运行 v0.9.1-incubating发布版本(来自 the Apache Incubator site。问题
我是第一次使用 Storm(从开始使用 Storm 学习),我的项目在运行时失败并出现 ClassNotFoundException: [WARNING] java.lang.ClassNotFoun
如何为 Storm 拓扑提供自定义配置?例如,如果我构建了一个连接到 MySQL 集群的拓扑,并且我希望能够在不重新编译的情况下更改需要连接的服务器,我该怎么做?我的偏好是使用配置文件,但我担心文件本
我一直在阅读 Storm并尝试使用 Storm-starter 中的示例。 我想我明白了这个概念,它非常适用于许多情况。我有一个我想做的测试项目来了解更多关于这方面的信息,但我想知道 Storm 是否
在我们的 Storm 1.0.2 应用程序中,我们面临内存不足的异常。在调试时,我们发现 Kafka spout 向 Bolt 发出了太多消息。 bolt 的运行能力几乎为 4.0。那么有没有一种方法
看完this和 this我很难理解如何配置我的三叉戟拓扑。 基本上我的 Storm 应用程序正在读取 kafka ,进行一些数据操作,最后写入 Cassandra . 这是我目前构建拓扑的方式: pr
我已经从 https://github.com/apache/incubator-storm 下载了 incubator-storm 代码.现在,我尝试使用以下命令运行 WordCountTopolo
我一直在努力理解 Storm 架构,但我不确定我是否理解正确。我会尽量准确地解释我认为的情况。请解释什么 - 如果 - 我错了,什么是对的。 初步想法: worker http://storm.apa
这是我阅读后想到的一个问题: What is the "task" in Storm parallelism 如果我需要在 bolt 的内部状态中保留一些信息,例如,在经典的单词计数用例中,将 bol
我已经使用 docker compose 安装了 Apache-Storm docker-compose.yml: kafka: image: spotify/kafka ports:
我正在围绕我的 Storm 拓扑构建一个监控服务,并希望能够获取各个时间窗口周围的失败元组数量,类似于 Storm UI 如何在 10m、3h 和 1d 窗口中显示失败元组的数量。 我的监控服务目前是
我已经在我的机器上配置了 Storm。 Zookeeper、Nimbus 和 Supervisor 运行正常。 现在我想向这个 Storm 提交一个拓扑。 我正在尝试使用 Storm jar 。 但我
我在玩 Storm,我想知道 Storm 在哪里指定(如果可能)聚合时的(翻滚/滑动)窗口大小。例如。如果我们想在 Twitter 上找到前一小时的热门话题。我们如何指定一个 bolt 应该每小时返回
我是一名优秀的程序员,十分优秀!