gpt4 book ai didi

hadoop - 轻松将 Elasticsearch 数据导入 HDFS

转载 作者:可可西里 更新时间:2023-11-01 15:26:09 29 4
gpt4 key购买 nike

我们在内部部署 Elasticsearch 集群已经快 2 年了,我们想做一些更高级的分析,例如使用其中包含的日志数据以及其他不同的数据源。

我们的重点是 Elasticsearch 中的系统日志。每天生成约 100gb 的系统日志数据——每天都是它自己的索引。我们也有一些应用程序日志,但如果我能为系统日志解决这个问题,我就可以轻松解决其他数据移动问题。

这引出了我的问题。对于我的分析,我们使用 Spark 2.1.1 和 Python API。我想要所有的系统日志数据,比如说,在 HDFS 中保存 2 周,这样我们就可以做两件事:

  1. 通过我们的 Spark/Hadoop 集群之间的通信避免延迟
  2. 加快我们机器学习工作的速度
  3. 我想开始使用 Parquet 处理我的数据,所以如果我从 ES 中提取数据,我以后可以用它做任何我想做的事。

现在,我的问题是 - 从 ES 中提取如此大量的数据并将其放入 HDFS 的最佳方法是什么?我在 PySpark 中有一个执行一些基本查询的示例,但是当我尝试将整个索引(每天生成 100gb 的索引)拉入 RDD 时,出现内存不足错误。我已经联系了 Elasticsearch 支持,但被告知这是我需要在 Hadoop/Spark 端解决的问题,但他们不支持。

我们已经设置了“ES-Hadoop 连接器”,它确实为我提供了一些工作框架,尽管理解文档确实是一个挑战。 Hadoop 生态系统的几个组件(HIVE、Spark、Hadoop 等)都有连接器。我不确定那里是否有解决方案,或者是否有更好的事情要做。我对此很陌生,所以请原谅任何有明显答案的问题。我正在寻找一些指导和一些具体的建议(如果可能的话,指向带有设置和代码的具体示例的指针会很棒)。我的目标是:

  1. 在 HDFS 中获取大约 2 周的系统日志(我希望这是连续的 2 周)
  2. 在 Elasticsearch 系统上创建最小负载
  3. 无论有什么方法,最好能自动执行此操作,这样每天都会摄取一个新索引并删除最旧的索引。这不是一个硬性要求,但很高兴拥有。

感谢您给我的任何帮助、建议或示例。

编辑/附加信息:

我想在这里添加一些代码来解释我正在尝试做什么。这个过程需要很长时间才能完成,甚至几个小时后也没有显示任何进展,所以我想知道我是否做错了什么。

以下是我启动 Py Spark 的方式:

pyspark --jars=/sysadmin/hadoop/elasticsearch-hadoop-5.6.3/dist/elasticsearch-hadoop-5.6.3.jar --master yarn --deploy-mode client --num-executors 10 --executor-cores 4 --executor-memory 8G --driver-memory 50G

然后,我做了一些事情,我设置了 esconf,创建了 RDD,然后尝试将它作为文本保存到 HDFS:

>>> esconf = {"es.net.http.auth.user":"XXXXX","es.net.http.auth.pass":"XXXXX","es.resource":"logstash-syslog-2017.10.11", "es.query":"?q=*","es.read.field.include":"message","es.nodes":"server0005","es.net.ssl":"true"}
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=esconf)
>>> rdd.saveAsTextFile("/user/spark/logstash-syslog-2017.10.11.txt")

现在,RDD 返回,如果我从 RDD 执行 take(1),这需要一段时间,但我可以返回前 10 个结果。在那 10 个记录集上,我可以保存它,效果很好。但是,在完整的 RDD 上,这会花费很长时间。我不太确定我应该期待什么,但我无法想象在一个 10 节点的集群上,每个盒子有 64gb 的 RAM 和 8 个内核,这需要几个小时。

最佳答案

I have an example in PySpark of doing some basic queries, but when I try and pull an entire index (100gb daily generated index) into an RDD, I get out of memory errors

默认情况下,Spark 不会为您的作业分配太多内存,所以是的,当处理那么多数据时,您会遇到 OOM 错误。

以下是您应该关注的关键属性及其默认值。

  • spark.dynamicAllocation.enabled - false
  • spark.executor.instances - 2
  • spark.executor.memory - 1g
  • spark.driver.cores - 1

如果您的 Spark 作业在 YARN 集群管理下运行,您还需要考虑 YARN 容器的大小。在集群模式下运行时,Application Master 将成为 Spark 驱动程序容器。根据我的经验,除非您的 Spark 代码正在调用 collect() 以通过驱动程序发回数据,否则它本身不需要那么多内存。

我会尝试先增加执行器内存,然后再增加执行器的数量。如果启用动态分配,那么您可以考虑不指定执行程序数量,但它确实设置了一个下限。

ES-Hadoop 提供了许多连接器来提取数据,但这一切都取决于偏好。如果您了解 SQL,请使用 Hive。 Pig 比 Spark 更容易运行。 Spark 占用大量内存,在某些集群中可能无法正常工作。

您在评论中提到了 NiFi,但它仍然是一个 Java 进程,并且容易出现 OOM 错误。除非你有一个 NiFi 集群,否则在写入 HDFS 之前,你将有一个进程在某个地方通过磁盘上的 FlowFile 提取 100 GB。

如果您需要整个索引的快照,Elasticsearch 提供 HDFS support对于这样的功能。不过,我不确定那是什么数据格式,或者 Hadoop 进程是否可以读取它。

关于hadoop - 轻松将 Elasticsearch 数据导入 HDFS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47104670/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com