gpt4 book ai didi

Elasticsearch-hadoop & Elasticsearch-spark sql - 语句扫描和滚动的跟踪

转载 作者:行者123 更新时间:2023-11-29 02:52:26 26 4
gpt4 key购买 nike

我们正在尝试将 ES(1.7.2,4 节点集群)与 Spark(1.5.1,使用 hive 和 hadoop 与 scala 2.11,4 节点集群编译)集成,hdfs 进入等式(hadoop 2.7,4节点)和 thrift jdbc 服务器和 elasticsearch-hadoop-2.2.0-m1.jar

因此,在ES上有两种执行语句的方式。

  1. 使用 scala 启动 SQL

    val conf = new  SparkConf().setAppName("QueryRemoteES").setMaster("spark://node1:37077").set("spark.executor.memory","2g")
    conf.set("spark.logConf", "true")
    conf.set("spark.cores.max","20")
    conf.set("es.index.auto.create", "false")
    conf.set("es.batch.size.bytes", "100mb")
    conf.set("es.batch.size.entries", "10000")
    conf.set("es.scroll.size", "10000")
    conf.set("es.nodes", "node2:39200")
    conf.set("es.nodes.discovery","true")
    conf.set("pushdown", "true")

    sc.addJar("executorLib/elasticsearch-hadoop-2.2.0-m1.jar")
    sc.addJar("executorLib/scala-library-2.10.1.jar")

    sqlContext.sql("CREATE TEMPORARY TABLE geoTab USING org.elasticsearch.spark.sql OPTIONS (resource 'geo_2/kafkain')" )

    val all: DataFrame = sqlContext.sql("SELECT count(*) FROM geoTab WHERE transmittersID='262021306841042'")
    .....
  2. Thrift 服务器(代码在 spark 上执行)

    ....
    polledDataSource = new ComboPooledDataSource()
    polledDataSource.setDriverClass("org.apache.hive.jdbc.HiveDriver")
    polledDataSource.setJdbcUrl("jdbc:hive2://node1:30001")
    polledDataSource.setMaxPoolSize(5)
    dbConnection = polledDataSource.getConnection
    dbStatement = dbConnection.createStatement

    val dbResult = dbStatement.execute("CREATE TEMPORARY EXTERNAL TABLE IF NOT EXISTS geoDataHive6(transmittersID STRING,lat DOUBLE,lon DOUBLE) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'geo_2/kafkain','es.query'='{\"query\":{\"term\":{\"transmittersID\":\"262021306841042\"}}}','es.nodes'='node2','es.port'='39200','es.nodes.discovery' = 'false','es.mapping.include' = 'trans*,point.*','es.mapping.names' = 'transmittersID:transmittersID,lat:point.lat,lon:point.lon','pushdown' = 'true')")

    dbStatement.setFetchSize(50000)
    dbResultSet = dbStatement.executeQuery("SELECT count(*) FROM geoDataHive6")
    .....

我有以下问题,由于它们相互关联,我决定将它们打包成堆栈中的一个问题:

  1. 似乎使用 Spark SQL 的方法支持 WHERE 后面的内容下推(无论是否指定 es.query),执行时间是相同的并且可以接受。但是解决方案编号 1 绝对不支持聚合函数的 pushdow,即呈现的 count(*) 不在 ES 端执行,而是仅在检索所有数据之后执行 - ES 返回行并且 Spark SQL 对它们进行计数。请确认这是否是正确的行为

  2. 第一个解决方案的行为很奇怪,无论下推传递的是真还是假,时间都是相等的

  3. 解决方案 2 似乎不支持下推,无论我尝试以何种方式指定子查询都无关紧要,无论是表定义的一部分还是语句的 WHERE 子句中,它似乎是只需获取所有巨大的索引,然后对其进行数学计算。是不是thrift-hive无法在ES上做下推

  4. 我想在 Elasticsearch 中跟踪查询,我设置如下:

    //logging.yml
    index.search.slowlog: TRACE, index_search_slow_log_file
    index.indexing.slowlog: TRACE, index_indexing_slow_log_file

    additivity:
    index.search.slowlog: true
    index.indexing.slowlog: true

所有的index.search.slowlog.threshold.query,index.search.slowlog.threshold.fetch甚至index.indexing.slowlog.threshold.index都设置为0ms。我确实在 slowlog 文件中看到从 sense 执行的公共(public)语句(所以它有效)。但是我没有看到针对 ES 执行的 Spark SQL 或 thrift 语句。我想这些是扫描和滚动语句,因为如果我从感觉上执行扫描和滚动,这些也不会被记录。是否有可能以某种方式在 ES 的一侧跟踪扫描和滚动?

最佳答案

  1. 据我所知,这是一种预期的行为。我知道的所有消息来源都以完全相同的方式运行,并且直觉上它是有道理的。 SparkSQL 专为分析查询而设计,在本地获取数据、缓存和处理更有意义。另见 Does spark predicate pushdown work with JDBC?

  2. 我认为 conf.set("pushdown", "true") 根本没有任何效果。如果你想配置特定于连接的设置,它应该像第二种情况一样作为 OPTION 映射传递。使用 es 前缀也应该有效

  3. 这确实很奇怪。 Martin Senne报道a similar issue使用 PostgreSQL,但我无法重现。

关于Elasticsearch-hadoop & Elasticsearch-spark sql - 语句扫描和滚动的跟踪,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33687691/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com