gpt4 book ai didi

java - Spark Streaming作业如何在Kafka主题上发送数据并将其保存在Elastic中

转载 作者:行者123 更新时间:2023-12-02 04:51:04 25 4
gpt4 key购买 nike

我正在开发一个数据分析项目,在该项目中,我从 CSV 文件中读取数据,在 Kafka 主题上遍历该数据,并使用 Spark Streaming 来使用该 Kafka 主题数据。我在单个项目中使用的所有组件。

现在,在使用 Spark Streaming 使用数据后,我必须对其进行一些计算,并且必须将数据保存到 Elasticsearch 中,并且必须将该数据发送到另一个主题。所以我正在从 Spark Streaming 做这些事情(将数据保存到弹性并将数据发送到主题)。

下面是我的代码

@Component
public class RawEventSparkConsumer implements Serializable {

@Autowired
private ElasticSearchServiceImpl dataModelServiceImpl;

@Autowired
private EventKafkaProducer enrichEventKafkaProducer;

Collection<String> topics = Arrays.asList("rawTopic");

public void sparkRawEventConsumer(JavaStreamingContext streamingContext) {

Map<String, Object> kafkaParams = new HashedMap();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);

JavaInputDStream<ConsumerRecord<String, String>> rawEventRDD = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

JavaDStream<String> dStream = rawEventRDD.map((x) -> x.value());

JavaDStream<BaseDataModel> baseDataModelDStream = dStream.map(convertIntoBaseModel);
baseDataModelDStream.foreachRDD(rdd1 -> {
saveDataToElasticSearch(rdd1.collect());
});

JavaDStream<EnrichEventDataModel> enrichEventRdd = baseDataModelDStream.map(convertIntoEnrichModel);

enrichEventRdd.foreachRDD(rdd -> {
System.out.println("Inside rawEventRDD.foreachRDD = = = " + rdd.count());
sendEnrichEventToKafkaTopic(rdd.collect());
});

streamingContext.start();

try {
streamingContext.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

static Function convertIntoBaseModel = new Function<String, BaseDataModel>() {

@Override
public BaseDataModel call(String record) throws Exception {
ObjectMapper mapper = new ObjectMapper();
BaseDataModel csvDataModel = mapper.readValue(record, BaseDataModel.class);
return csvDataModel;
}
};

static Function convertIntoEnrichModel = new Function<BaseDataModel, EnrichEventDataModel>() {

@Override
public EnrichEventDataModel call(BaseDataModel csvDataModel) throws Exception {

EnrichEventDataModel enrichEventDataModel = new EnrichEventDataModel(csvDataModel);
enrichEventDataModel.setEnrichedUserName("Enriched User");
User user = new User();
user.setU_email("Nitin.Tyagi");
enrichEventDataModel.setUser(user);
return enrichEventDataModel;
}
};

private void sendEnrichEventToKafkaTopic(List<EnrichEventDataModel> enrichEventDataModels) {
if (enrichEventKafkaProducer != null && enrichEventDataModels != null && enrichEventDataModels.size() > 0)
try {
enrichEventKafkaProducer.sendEnrichEvent(enrichEventDataModels);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

private void saveDataToElasticSearch(List<BaseDataModel> baseDataModelList) {
if(!baseDataModelList.isEmpty())
dataModelServiceImpl.saveAllBaseModel(baseDataModelList);
}
}

现在我有几个问题

1)我的方法好吗,即在 Elastic Search 中保存数据并从 Spark Streaming 发送有关主题的数据?

2)我在单个项目中使用应用程序组件(Kafka、Spark Streaming),并且有多个 Spark Streaming 类。我在本地系统中通过 CommandLineRunner 运行这些类。那么现在如何将 Spark Streaming 作为 Spark 作业提交呢?

对于 Spark Submit,我需要使用 Spark Streaming 类创建单独的项目吗?

最佳答案

Is my approach fine, i.e saving data in Elastic Search and sending it on topic from Spark Streaming?

我想我会考虑使用 ES-Hadoop Spark 库。看起来您刚刚直接使用了 Elastic Java API(假定您正在收集 RDD 分区)

虽然它可能有效,但它是高度耦合的......当 Elasticsearch 因维护而停机或高度潜在时会发生什么?整个应用程序会停止吗?

另一种方法是将 Kafka 处理逻辑拆分到自己的部署中。这样,您还可以只使用 Elasticsearch Kafka Connect 流程从主题加载数据,而无需自己编写该代码(Connect API 可能已经是您正在运行的 Kafka 集群的一部分)

there are multiple Spark Streaming classes

多个主类?这不应该是一个问题。您需要为 Spark 提交提供一个 JAR 和一个类名称。您可以在一个 jar 中拥有多个“入口点”/主要方法。

how can submit Spark Streaming as a spark job?

我不确定我是否理解这个问题。 spark-submit 适用于流作业

注意:如果您计划更改数据类型或其顺序,并且您还希望除您自己之外的任何人都可以使用该主题,则 CSV 是您可以放入 Kafka 的最糟糕的格式之一。即使 Elasticsearch 也希望您拥有 json 编码的有效负载

关于java - Spark Streaming作业如何在Kafka主题上发送数据并将其保存在Elastic中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56441306/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com