gpt4 book ai didi

java - 性能问题: Kafka + Storm + Trident + OpaqueTridentKafkaSpout

转载 作者:行者123 更新时间:2023-12-02 05:41:09 25 4
gpt4 key购买 nike

我们发现 Kafka + Storm + Trident + OpaqueTridentKafkaSpout 存在一些性能问题

下面提到的是我们的设置详细信息:

Storm 拓扑:

Broker broker = Broker.fromString("localhost:9092")
GlobalPartitionInformation info = new GlobalPartitionInformation()
if(args[4]){
int partitionCount = args[4].toInteger()
for(int i =0;i<partitionCount;i++){
info.addPartition(i, broker)
}
}
StaticHosts hosts = new StaticHosts(info)
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,"test")
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme())


OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig)
TridentTopology topology = new TridentTopology()
Stream st = topology.newStream("spout1", kafkaSpout).parallelismHint(args[2].toInteger())
.each(kafkaSpout.getOutputFields(), new NEO4JTridentFunction(), new Fields("status"))
.parallelismHint(args[1].toInteger())
Map conf = new HashMap()
conf.put(Config.TOPOLOGY_WORKERS, args[3].toInteger())
conf.put(Config.TOPOLOGY_DEBUG, false)

if (args[0] == "local") {
LocalCluster cluster = new LocalCluster()
cluster.submitTopology("mytopology", conf, topology.build())
} else {
StormSubmitter.submitTopology("mytopology", conf, topology.build())
NEO4JTridentFunction.getGraphDatabaseService().shutdown()
}

我们用于 Storm 的 Storm.yaml 如下:

########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
- "localhost"
# - "server2"
#
storm.zookeeper.port : 2999


storm.local.dir: "/opt/mphrx/neo4j/stormdatadir"

nimbus.childopts: "-Xms2048m"
ui.childopts: "-Xms1024m"
logviewer.childopts: "-Xmx512m"
supervisor.childopts: "-Xms1024m"
worker.childopts: "-Xms2600m -Xss256k -XX:MaxPermSize=128m -XX:PermSize=96m
-XX:NewSize=1000m -XX:MaxNewSize=1000m -XX:MaxTenuringThreshold=1 -XX:SurvivorRatio=6
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled
-XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
-server -XX:+AggressiveOpts -XX:+UseCompressedOops -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true
-Xloggc:logs/gc-worker-%ID%.log -verbose:gc
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1m
-XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps -XX:+PrintClassHistogram
-XX:+PrintTenuringDistribution -XX:-PrintGCApplicationStoppedTime -XX:-PrintGCApplicationConcurrentTime
-XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal"

java.library.path: "/usr/lib/jvm/jdk1.7.0_25"

supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703

topology.trident.batch.emit.interval.millis: 100
topology.message.timeout.secs: 300
#topology.max.spout.pending: 10000
  • Kafka 中生成的每条消息的大小:11 KB
  • 每个bolt(NEO4JTridentFunction)处理数据的执行时间:500ms
  • 没有。 Storm worker 数量:1
  • Spout 的并行度提示(OpaqueTridentKafkaSpout):1
  • Bolt/Function(NEO4JTridentFunction) 的并行度提示:50

  • 我们看到 Spout 的吞吐量约为 12 条消息/秒。

  • Kafka 中生成消息的速率:150 条消息/秒

Storm和Kafka都是单节点部署。我们已经了解到 Storm 的吞吐量要高得多,但无法实现相同的效果。请建议如何调整 Storm+ Kafka + OpaqueTridentKafkaSpout 配置以实现更高的吞吐量。在这方面的任何帮助都会对我们有很大帮助。

谢谢

最佳答案

您应该将 spout 并行度设置为与上述主题的分区计数相同。默认情况下,trident 每次执行接受一批,您应该通过更改 topology.max.spout.pending 属性来增加此计数。由于 Trident 强制进行有序事务管理,因此您的执行方法(NEO4JTridentFunction)必须快速才能达到所需的解决方案。

此外,您可以使用“tridentConfig.fetchSizeBytes”,通过更改它,您可以为 spout 中的每个新发出调用摄取更多数据。

您还必须检查垃圾收集日志,它会给您有关真实情况的线索。

您可以通过添加“-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:{path}/gc-storm-worker-%ID%.log”来启用垃圾收集日志,在您的工作配置中的worker.childopts设置中。

最后但并非最不重要的一点是,如果你的年轻代比例高于正常情况,你可以使用G1GC。

关于java - 性能问题: Kafka + Storm + Trident + OpaqueTridentKafkaSpout,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24510456/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com