gpt4 book ai didi

java - 如何通过 Spark 提交 Spark Streaming 应用程序

转载 作者:行者123 更新时间:2023-12-02 05:34:22 26 4
gpt4 key购买 nike

我是 Spark 新手,对此没有太多想法。我正在开发一个应用程序,其中数据在 different-2 Kafka 主题上遍历,并且 Spark Streaming 从该主题读取数据。它是一个 SpringBoot 项目,其中有 3 个 Spark 消费者类。这些 SparkStreaming 类的工作是使用 Kafka 主题中的数据并将其发送到另一个主题。 SparkStreaming 类的代码如下-

    @Service
public class EnrichEventSparkConsumer {

Collection<String> topics = Arrays.asList("eventTopic");

public void startEnrichEventConsumer(JavaStreamingContext javaStreamingContext) {

Map<String, Object> kafkaParams = new HashedMap();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);


JavaInputDStream<ConsumerRecord<String, String>> enrichEventRDD = KafkaUtils.createDirectStream(javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

JavaDStream<String> enrichEventDStream = enrichEventRDD.map((x) -> x.value());
JavaDStream<EnrichEventDataModel> enrichDataModelDStream = enrichEventDStream.map(convertIntoEnrichModel);

enrichDataModelDStream.foreachRDD(rdd1 -> {
saveDataToElasticSearch(rdd1.collect());
});

enrichDataModelDStream.foreachRDD(enrichDataModelRdd -> {
if(enrichDataModelRdd.count() > 0) {
if(executor != null) {
executor.executePolicy(enrichDataModelRdd.collect());
}
}
});

}

static Function convertIntoEnrichModel = new Function<String, EnrichEventDataModel>() {

@Override
public EnrichEventDataModel call(String record) throws Exception {
ObjectMapper mapper = new ObjectMapper();
EnrichEventDataModel csvDataModel = mapper.readValue(record, EnrichEventDataModel.class);
return csvDataModel;
}
};

private void saveDataToElasticSearch(List<EnrichEventDataModel> baseDataModelList) {
for (EnrichEventDataModel baseDataModel : baseDataModelList)
dataModelServiceImpl.save(baseDataModel);
}
}

我正在使用 CommandLineRunner 调用 startEnrichEventConsumer() 方法。

public class EnrichEventSparkConsumerRunner implements CommandLineRunner {

@Autowired
JavaStreamingContext javaStreamingContext;

@Autowired
EnrichEventSparkConsumer enrichEventSparkConsumer;

@Override
public void run(String... args) throws Exception {
//start Raw Event Spark Cosnumer.
JobContextImpl jobContext = new JobContextImpl(javaStreamingContext);

//start Enrich Event Spark Consumer.
enrichEventSparkConsumer.startEnrichEventConsumer(jobContext.streamingctx());
}

}

现在我想将这三个 Spark Streaming 类提交到集群上。我在某处读到,我必须先创建一个 Jar 文件,然后我可以使用 Spark-submit 命令,但我心中有一些问题 -

  1. 我应该使用这 3 个 Spark Streaming 类创建一个不同的项目吗?
  2. 到目前为止,我正在使用 CommandLineRunner 启动 SparkStreaming,那么何时提交集群,我应该在这些类中创建 main() 方法吗?

请告诉我该怎么做。提前致谢。

最佳答案

  • 不需要不同的项目。
  • 您应该创建负责 JavaStreamingContext 创建的入口点/main。
  • 使用依赖项创建 jar,这些依赖项位于一个 jar 文件中,不要忘记为所有 Spark 依赖项提供提供的范围,因为您将使用集群的库。
<小时/>

使用spark-submit命令行应用程序执行组装的Spark应用程序,如下所示:

./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

对于本地提交

bin/spark-submit \
--class package.Main \
--master local[2] \
path/to/jar argument1 argument2

关于java - 如何通过 Spark 提交 Spark Streaming 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56183589/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com