- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
当将 Storm 作业提交到 Hadoop 集群以使用 hdfsbolt 写入 hdfs 时,不会在 Storm UI 中创建拓扑。显示错误是因为代码中使用了一些包(org.apache.storm.hdfs.bolt.AbstractHdfsBolt.cleanup(AbstractHdfsBolt.java:261) ~[f083f1dc515311e9868bcf07babd3298.jar:?])。
错误:
42608 [Thread-20-bolt-executor[3 3]] INFO o.a.s.util - Async loop interrupted!
42608 [Thread-19-disruptor-executor[3 3]-send-queue] INFO o.a.s.util - Async loop interrupted!
42608 [SLOT_1024] INFO o.a.s.d.executor - Shut down executor bolt:[3 3]
42608 [SLOT_1024] INFO o.a.s.d.executor - Shutting down executor __acker:[1 1]
42608 [Thread-22-__acker-executor[1 1]] INFO o.a.s.util - Async loop interrupted!
42608 [Thread-21-disruptor-executor[1 1]-send-queue] INFO o.a.s.util - Async loop interrupted!
42608 [SLOT_1024] INFO o.a.s.d.executor - Shut down executor __acker:[1 1]
42608 [SLOT_1024] INFO o.a.s.d.executor - Shutting down executor __system:[-1 -1]
42608 [Thread-24-__system-executor[-1 -1]] INFO o.a.s.util - Async loop interrupted!
42608 [Thread-23-disruptor-executor[-1 -1]-send-queue] INFO o.a.s.util - Async loop interrupted!
42609 [SLOT_1024] INFO o.a.s.d.executor - Shut down executor __system:[-1 -1]
42609 [SLOT_1024] INFO o.a.s.d.executor - Shutting down executor kafka_spout:[5 5]
42609 [Thread-25-disruptor-executor[5 5]-send-queue] INFO o.a.s.util - Async loop interrupted!
42609 [Thread-26-kafka_spout-executor[5 5]] INFO o.a.s.util - Async loop interrupted!
42611 [SLOT_1024] INFO o.a.s.d.executor - Shut down executor kafka_spout:[5 5]
42611 [SLOT_1024] INFO o.a.s.d.executor - Shutting down executor forwardToKafka:[4 4]
42611 [Thread-28-forwardToKafka-executor[4 4]] INFO o.a.s.util - Async loop interrupted!
42611 [Thread-27-disruptor-executor[4 4]-send-queue] INFO o.a.s.util - Async loop interrupted!
42612 [SLOT_1024] ERROR o.a.s.d.s.Slot - Error when processing event
java.lang.NullPointerException: null
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.cleanup(AbstractHdfsBolt.java:261) ~[f083f1dc515311e9868bcf07babd3298.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
at org.apache.storm.daemon.executor$fn__9739.invoke(executor.clj:878) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
at org.apache.storm.daemon.executor$mk_executor$reify__9530.shutdown(executor.clj:437) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$fn__10165$exec_fn__1369__auto__$reify__10167$shutdown_STAR___10187.invoke(worker.clj:684) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.worker$fn__10165$exec_fn__1369__auto__$reify$reify__10213.shutdown(worker.clj:724) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.ProcessSimulator.killProcess(ProcessSimulator.java:67) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.supervisor.LocalContainer.kill(LocalContainer.java:69) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.supervisor.Slot.killContainerForChangedAssignment(Slot.java:311) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.supervisor.Slot.handleRunning(Slot.java:527) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:265) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:752) [storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
42612 [SLOT_1024] ERROR o.a.s.u.Utils - Halting process: Error when processing an event
java.lang.RuntimeException: Halting process: Error when processing an event
at org.apache.storm.utils.Utils.exitProcess(Utils.java:1814) [storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:796) [storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
以下是使用的Java代码。这是主要的拓扑文件。数据从 Kafka 采集,通过 hdfsbolt 发送到 hdfs。部分数据存储在 hdfs 中,但所有工作节点都没有工作,而且拓扑也没有在 Storm UI 中创建。
Java:
package hdpstrm.hdpstrm;
import java.io.File;
import java.io.FileOutputStream;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout. * ;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Values;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import hdpstrm.hdpstrm.printBolt;
public class MyMain {
private static HdfsBolt createHdfsBolt() {
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
DefaultFileNameFormat fileNameFormat = new DefaultFileNameFormat();
fileNameFormat.withPath("/user/march26");
fileNameFormat.withPrefix("upcse"); //Files end with the following suffix
fileNameFormat.withExtension(".csv");
return new HdfsBolt().withFsUrl("hdfs://localhost:8020").withFileNameFormat(fileNameFormat).withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);
}
public static void main(String[] args) throws Exception {
Config config = new Config();
config.setDebug(false);
config.setNumWorkers(1);
config.put("ALLFILE", (Object)"/home/kdx/out2.txt");
TopologyBuilder tp = new TopologyBuilder();
String kafka_bootstrap = "localhost:6667";
String kafka_topic = args[0];
Builder < String,
String > kafka_config = KafkaSpoutConfig.builder(kafka_bootstrap, kafka_topic).setGroupId("group_id");
kafka_config.build().getKafkaProps().keySet();
KafkaSpout < String,
String > kafka_spout = new KafkaSpout < String,
String > (kafka_config.build());
tp.setSpout("kafka_spout", kafka_spout, 1);
tp.setBolt("bolt", new printBolt()).shuffleGrouping("kafka_spout");
tp.setBolt("forwardToKafka", createHdfsBolt(), 1).shuffleGrouping("bolt");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafkaTopology", config, tp.createTopology());
//Wait for 40 seconds
Thread.sleep(40000);
//Stop the topology
cluster.shutdown();
System.out.println(" ******** TERMINATED THE LOCAL CLUSTER *********");
StormSubmitter.submitTopologyWithProgressBar("MyMain", config, tp.createTopology());
}
}
预期的结果是在 Storm UI 中创建拓扑,并确保在运行 Storm jar 时所有工作节点的参与。
最佳答案
错误是由于 storm-hdfs 中的错误。
只有当轮换策略是 TimedRotationPolicy 时才会初始化该变量,而您的不是。
您可以在 https://issues.apache.org/jira 提交错误报告.也欢迎 PR 在 https://github.com/apache/storm .
关于java - 未在 Storm UI 中创建 Storm 拓扑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55410281/
是否可以可视化kubernetes拓扑并看到它在添加/删除/链接对象时实时更新? 我在https://www.youtube.com/watch?v=38SNQPhsGBk上观看了一个视频,其中服务/
我在搜索 Rack 拓扑时发现了这个问题...可能是hadoop认证问题: 您的群集在三个不同的 Rack 中都有从属节点,并且您编写了一个 Rack 拓扑脚本,以将每台计算机分别识别为位于 Rack
我拿了sticky force layout示例并尝试添加额外链接并使用 enter() 更新布局,但随后所有链接都消失了,FireBug 也没有显示任何错误。 这些行不应该添加链接吗? graph.
我需要编写一个订单管理器,将客户(股票、外汇等)订单发送到适当的交易所。客户想要发送订单,但对 FIX 或其他专有协议(protocol)一无所知,只知道发送订单的内部(规范化)格式。我有应用程序(服
我正在尝试使用 Eclipse 在远程主机上提交 Storm 拓扑。 这是我的代码: Config conf = new Config(); conf.setDebug(false); conf.se
这是一个名为 mininet 的流行网络模拟器的拓扑文件 我创建了一个 MultiSwitch() 类,我想将其传递给我的 Topology 类以用作默认开关 有没有办法做到这一点?我对Python不
我从 cat/proc/cpuinfo 中了解到,我正在使用 Intel(R) Core(TM) i5 CPU M 560 @ 2.67GHz。但是我想知道确切的层次结构,比如有多少个套接字,每个套接
我正在学习storm。我对我们可以在 Apache Storm 上一次运行的拓扑数量有疑问。我已经在 Storm 集群上提交了两个拓扑,但一次只运行了一个拓扑。我需要杀死或停用已经存在的拓扑拓扑以运行
我正在尝试理解 topology of queues并交换 MT 在 RabbitMQ 中创建的。 我不能得到这两个陈述: we generate an exchange for each queue
我正在寻找一种方法来测试 Kafka Streams 应用程序。这样我就可以定义输入事件,测试套件会向我显示输出。 如果没有真正的 Kafka 设置,这可能吗? 最佳答案 更新 Kafka 1.1.0
我正在使用 Java 类将拓扑提交到 Storm 集群,我还计划使用 Java 类来终止拓扑。但根据 Storm documentation ,以下命令用于终止拓扑并且没有 Java 方法(这是有正当
Storm jar storm-starter-topologies-0.10.0-beta1.jar storm-starter-master.jar生产拓扑本地 我遇到了错误: Running:
我正在编写一个 dockerized Java Spring 应用程序,该应用程序使用 Apache Storm v1.1.2、Kafka v0.11.0.1、Zookeeper 3.4.6、Eure
我在 jts 拓扑库中有一些多边形。如果我想在 javafx Pane 上绘图,我会这样做: Polygon poly=new Polygon();//javafx //g is geometry
我需要在 Java GUI 应用程序中动态绘制(星形)拓扑。通过星形拓扑,我的意思是这样的: (来源:thebryantadvantage.com) 不需要太花哨,但我不想做得太丑陋和粗糙。我所说的动
我想自动化设置 Mininet 的过程虚拟机,通过 SSH 连接到 VM,在 VM 中启动 Mininet,并初始化拓扑。我需要 session 保持打开状态,以便我可以使用创建的网络向 Minine
我正在尝试重新平衡使用 KafkaSpout 的 Storm 拓扑。我的代码是: TopologyBuilder builder = new TopologyBuilder(); Pr
标题几乎说明了一切,我有一些 Storm 拓扑,我想测量它们的延迟,即来自 Kafka 的消息与最终相关执行的最后一点之间的时间量 bolt 。如果我可以深入研究结果以查看每个 bolt 之间的延迟,
假设我想让一些转换“A”可配置。此转换使用状态存储管理某些状态,并且还需要重新分区,这意味着仅在配置后才会进行重新分区。现在,如果我按照以下方式(或任何其他组合)运行应用程序 3 次(也可能是滚动升级
我目前正在尝试实现与 R 语言集成的 Storm 拓扑。 作为起点,我采用了以下项目 ( https://github.com/allenday/R-Storm ),它通过扩展 ShellBolt 类
我是一名优秀的程序员,十分优秀!