gpt4 book ai didi

java - flink : Could not initialize class org. apache.hadoop.hdfs.protocol.HdfsConstants

转载 作者:行者123 更新时间:2023-12-02 01:26:38 24 4
gpt4 key购买 nike

为了将数据写入 hdfs,我在 pom 中添加了 flink-connector-filesystem_2.11,这样我就可以使用 BucketingSink

当我将 jar 提交到 flink 集群时,它确实会向 hdfs 写入一些消息。然而,几分钟后,异常被抛出。

通过 jar tvf show-event-to-kafka/target/show-event-to-kafka-1.0-SNAPSHOT.jar | grep HdfsConstants.class,发现HdfsConstants确实存在。

如何解决?

堆栈跟踪:

TimerException{java.io.IOException: DataStreamer Exception: }
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: DataStreamer Exception:
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:695)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hdfs.protocol.HdfsConstants
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1413)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1357)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:587)

POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.mafengwo.recommend.rtp</groupId>
<artifactId>parent-project</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>

<modules>
<module>quick-start</module>
<module>flink-template</module>
<module>feature-calculate</module>
<module>show-event-to-kafka</module>
<module>monitor</module>
<module>page-event-to-redis</module>
</modules>

<properties>
<applicationName>recommend-rtp</applicationName>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.6.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

<dependencies>

<!-- flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>${flink.version}</version>
</dependency>


<!-- json -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.9.7</version>
</dependency>

<!-- test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.1.0</version>
<scope>test</scope>
</dependency>

<!-- log -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.11.1</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>

<!-- redis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

<!-- statistic -->
<dependency>
<groupId>com.tdunning</groupId>
<artifactId>t-digest</artifactId>
<version>3.1</version>
</dependency>

<dependency>
<groupId>com.mafengwo.recommend</groupId>
<artifactId>common</artifactId>
<version>1.30</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.6</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>

</dependencies>

<build>
<resources>
<resource>
<!-- 将properties和xml文件中,${...}格式的变量,替换成pom文件中定义的变量 -->
<directory>src/main/resources</directory>
<filtering>true</filtering>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
</resource>
</resources>

<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>
com.mafengwo.FlinkEntry
</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</plugin>
</plugins>
</build>

<distributionManagement>
<repository>
<id>nexus-releases</id>
<name>Nexus Release Repository</name>
<url>https://nexus.mfwdev.com/repository/recommend-release/</url>
</repository>

<snapshotRepository>
<id>nexus-snapshots</id>
<name>Nexus Snapshots Repository</name>
<url>https://nexus.mfwdev.com/repository/recommend-snapshots/</url>
</snapshotRepository>
</distributionManagement>

<repositories>
<repository>
<id>nexus-releases</id>
<name>Nexus Release Repository</name>
<url>https://nexus.mfwdev.com/repository/recommend-release/</url>
</repository>
</repositories>

</project>

最佳答案

我通过在“flink-conf.yaml”中明确指定检查点的配置解决了这个问题。 flink的版本是1.8.2。

state.backend: filesystem
state.checkpoints.dir: hdfs://<ip>:<port>/flink-checkpoints
state.savepoints.dir: hdfs://<ip>:<port>/flink-checkpoints

关于java - flink : Could not initialize class org. apache.hadoop.hdfs.protocol.HdfsConstants,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56803650/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com