gpt4 book ai didi

apache-spark - 带有 Spark 1.6.1 Hadoop 2.7.2 的 Google Dataproc 中带有空记录的 Kinesis Stream

转载 作者:行者123 更新时间:2023-12-04 04:15:11 25 4
gpt4 key购买 nike

我正在尝试从 Google Dataproc 连接到 Amazon Kinesis Stream,但只收到空 RDD。

Command: spark-submit  --verbose --packages org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.2 demo_kinesis_streaming.py --awsAccessKeyId XXXXX        --awsSecretKey XXXX

详细日志:https://gist.github.com/sshrestha-datalicious/e3fc8ebb4916f27735a97e9fcc42136c

更多详情
Spark 1.6.1
Hadoop 2.7.2
使用的程序集:/usr/lib/spark/lib/spark-assembly-1.6.1-hadoop2.7.2.jar

令人惊讶的是,当我使用以下命令下载并使用包含 SPARK 1.6.1 和 Hadoop 2.6.0 的程序集时,它起作用了。

Command: SPARK_HOME=/opt/spark-1.6.1-bin-hadoop2.6 spark-submit  --verbose --packages org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.2 demo_kinesis_streaming.py --awsAccessKeyId XXXXX        --awsSecretKey XXXX

我不确定这两个 hadoop 版本和 Kinesis ASL 之间是否存在任何版本冲突,或者是否与 Google Dataproc 的自定义设置有关。

如有任何帮助,我们将不胜感激。

谢谢
素人

最佳答案

我们的团队遇到了类似的情况,我们设法解决了这个问题:

我们在同一个环境中运行:

  • DataProc Image Version 1 with Spark 1.6.1 with Hadoop 2.7
  • 一个简单的 SparkStream Kinesis 脚本归结为:

    # Run the script as
    # spark-submit \
    # --packages org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.1\
    # demo_kinesis_streaming.py\
    # --awsAccessKeyId FOO\
    # --awsSecretKey BAR\
    # ...

    import argparse

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.storagelevel import StorageLevel

    from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

    ap = argparse.ArgumentParser()
    ap.add_argument('--awsAccessKeyId', required=True)
    ap.add_argument('--awsSecretKey', required=True)
    ap.add_argument('--stream_name')
    ap.add_argument('--region')
    ap.add_argument('--app_name')
    ap = ap.parse_args()

    kinesis_application_name = ap.app_name
    kinesis_stream_name = ap.stream_name
    kinesis_region = ap.region
    kinesis_endpoint_url = 'https://kinesis.{}.amazonaws.com'.format(ap.region)

    spark_context = SparkContext(appName=kinesis_application_name)
    streamingContext = StreamingContext(spark_context, 60)

    kinesisStream = KinesisUtils.createStream(
    ssc=streamingContext,
    kinesisAppName=kinesis_application_name,
    streamName=kinesis_stream_name,
    endpointUrl=kinesis_endpoint_url,
    regionName=kinesis_region,
    initialPositionInStream=InitialPositionInStream.TRIM_HORIZON,
    checkpointInterval=60,
    storageLevel=StorageLevel.MEMORY_AND_DISK_2,
    awsAccessKeyId=ap.awsAccessKeyId,
    awsSecretKey=ap.awsSecretKey
    )

    kinesisStream.pprint()

    streamingContext.start()
    streamingContext.awaitTermination()
  • 代码已经过测试,在 AWS EMR 和本地环境中工作,使用相同的 Spark 1.6.1 和 Hadoop 2.7 设置。

  • 当 DataProc 上的 Kinesis 流中有数据时,脚本返回空 RDD,没有打印任何错误。
  • 我们使用以下环境在 DataProc 上对其进行了测试,但均未有效。
    1. 通过gcloud命令提交作业;
    2. ssh 进入集群主节点并在 yarn 客户端模式下运行;
    3. ssh 进入集群主节点并作为 local[*] 运行。

通过使用以下值更新 /etc/spark/conf/log4.properties 启用详细日志记录:

    log4j.rootCategory=DEBUG, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n
log4j.logger.org.eclipse.jetty=ERROR
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=DEBUG
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=DEBUG
log4j.logger.org.apache.spark=DEBUG
log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=DEBUG
log4j.logger.org.spark-project.jetty.server.handler.ContextHandler=DEBUG
log4j.logger.org.apache=DEBUG
log4j.logger.com.amazonaws=DEBUG

我们注意到日志中有些奇怪的东西(请注意 spark-streaming-kinesis-asl_2.10:1.6.1 使用 aws-sdk-java/1.9.37 作为依赖,同时以某种方式使用了 aws-sdk-java/1.7.4 [由用户代理建议]):

    16/07/10 06:30:16 DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer: PROCESS task encountered execution exception:
java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.model.GetRecordsResult.getMillisBehindLatest()Ljava/lang/Long;
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer.checkAndSubmitNextTask(ShardConsumer.java:137)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer.consumeShard(ShardConsumer.java:126)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:334)
at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174)

Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.model.GetRecordsResult.getMillisBehindLatest()Ljava/lang/Long;
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:119)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

content-length:282
content-type:application/x-amz-json-1.1
host:kinesis.ap-southeast-2.amazonaws.com
user-agent:SparkDemo,amazon-kinesis-client-library-java-1.4.0, aws-sdk-java/1.7.4 Linux/3.16.0-4-amd64 OpenJDK_64-Bit_Server_VM/25.91-b14/1.8.0_91
x-amz-date:20160710T063016Z
x-amz-target:Kinesis_20131202.GetRecords

DataProc 似乎已经使用旧得多的 AWS SDK 作为依赖项构建了自己的 Spark,当与需要新版本 AWS SDK 的代码结合使用时它会崩溃,尽管我们不确定到底是哪个模块导致了这个错误。

更新:根据@DennisHuo 的评论,此行为是由 Hadoop 泄漏的类路径引起的: https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-project/pom.xml#L650

更糟糕的是,AWS KCL 1.4.0(Spark 1.6.1 使用)will suppress any runtime error silently而不是抛出 RuntimeException 并在调试时造成很多麻烦。


最终,我们的解决方案是构建我们的 org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.1 及其所有 com.amazonaws.* 阴影。

使用以下 pom(更新 spark/extra/kinesis-asl/pom.xml)构建 JAR,并在 --jars 标志中删除新的 JAR Spark 提交

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.6.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<!-- Kinesis integration is not included by default due to ASL-licensed code. -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Kinesis Integration</name>

<properties>
<sbt.project.name>streaming-kinesis-asl</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>${aws.kinesis.client.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>${aws.kinesis.producer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-test-tags_${scala.binary.version}</artifactId>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>

<artifactSet>
<includes>
<!-- At a minimum we must include this to force effective pom generation -->
<include>org.spark-project.spark:unused</include>
<include>com.amazonaws:*</include>
</includes>
</artifactSet>

<relocations>
<relocation>
<pattern>com.amazonaws</pattern>
<shadedPattern>foo.bar.YO.com.amazonaws</shadedPattern>
<includes>
<include>com.amazonaws.**</include>
</includes>
</relocation>
</relocations>

</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

关于apache-spark - 带有 Spark 1.6.1 Hadoop 2.7.2 的 Google Dataproc 中带有空记录的 Kinesis Stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38237345/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com