gpt4 book ai didi

java - 无法运行 JAR - 使用 Java 进行 Spark Twitter Streaming

转载 作者:行者123 更新时间:2023-12-01 21:39:59 26 4
gpt4 key购买 nike

我在 Ubuntu 中以独立模式运行 Spark 2.4.3。我正在使用 Maven 创建 JAR 文件。下面是我尝试运行的代码,旨在从 Twitter 传输数据。Spark 启动后,Spark master 将位于 127.0.1.1:7077。使用的java版本是1.8。

package SparkTwitter.SparkJavaTwitter;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.twitter.TwitterUtils;

import scala.Tuple2;
import twitter4j.Status;
import twitter4j.auth.Authorization;
import twitter4j.auth.OAuthAuthorization;
import twitter4j.conf.Configuration;
import twitter4j.conf.ConfigurationBuilder;

import com.google.common.collect.Iterables;


public class TwitterStream {

public static void main(String[] args) {
// Prepare the spark configuration by setting application name and master node "local" i.e. embedded mode
final SparkConf sparkConf = new SparkConf().setAppName("Twitter Data Processing").setMaster("local[2]");
// Create Streaming context using spark configuration and duration for which messages will be batched and fed to Spark Core
final JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Duration.apply(10000));

// Prepare configuration for Twitter authentication and authorization
final Configuration conf = new ConfigurationBuilder().setDebugEnabled(false)
.setOAuthConsumerKey("customer key")
.setOAuthConsumerSecret("customer key secret")
.setOAuthAccessToken("Access token")
.setOAuthAccessTokenSecret("Access token secret")
.build();
// Create Twitter authorization object by passing prepared configuration containing consumer and access keys and tokens
final Authorization twitterAuth = new OAuthAuthorization(conf);
// Create a data stream using streaming context and Twitter authorization
final JavaReceiverInputDStream<Status> inputDStream = TwitterUtils.createStream(streamingContext, twitterAuth, new String[]{});
// Create a new stream by filtering the non english tweets from earlier streams
final JavaDStream<Status> enTweetsDStream = inputDStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));
// Convert stream to pair stream with key as user screen name and value as tweet text
final JavaPairDStream<String, String> userTweetsStream =
enTweetsDStream.mapToPair(
(status) -> new Tuple2<String, String>(status.getUser().getScreenName(), status.getText())
);

// Group the tweets for each user
final JavaPairDStream<String, Iterable<String>> tweetsReducedByUser = userTweetsStream.groupByKey();
// Create a new pair stream by replacing iterable of tweets in older pair stream to number of tweets
final JavaPairDStream<String, Integer> tweetsMappedByUser = tweetsReducedByUser.mapToPair(
userTweets -> new Tuple2<String, Integer>(userTweets._1, Iterables.size(userTweets._2))
);
// Iterate over the stream's RDDs and print each element on console
tweetsMappedByUser.foreachRDD((VoidFunction<JavaPairRDD<String, Integer>>)pairRDD -> {
pairRDD.foreach(new VoidFunction<Tuple2<String,Integer>>() {

@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1() + "," + t._2());
}

});
});
// Triggers the start of processing. Nothing happens if streaming context is not started
streamingContext.start();
// Keeps the processing live by halting here unless terminated manually
//streamingContext.awaitTermination();

}

}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>SparkTwitter</groupId>
<artifactId>SparkJavaTwitter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>SparkJavaTwitter</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.3</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-twitter -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-twitter_2.11</artifactId>
<version>1.6.3</version>
</dependency>

</dependencies>
</project>

要执行代码,我使用以下命令

./bin/spark-submit --class SparkTwitter.SparkJavaTwitter.TwitterStream /home/hadoop/eclipse-workspace/SparkJavaTwitter/target/SparkJavaTwitter-0.0.1-SNAPSHOT.jar

下面是我得到的输出。

19/11/10 22:17:58 WARN Utils: Your hostname, hadoop-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
19/11/10 22:17:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/11/10 22:17:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Warning: Failed to load SparkTwitter.SparkJavaTwitter.TwitterStream: twitter4j/auth/Authorization
log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

我一直以同样的方式运行字数统计程序,并且运行良好。当我构建 JAR 时,它也构建成功。运行 JAR 时是否需要指定更多参数?

最佳答案

我遇到过类似的问题,发现你需要直接将 jar 提供给 Spark-submit。我所做的是使用 --jars "<path-to-jars>/*" 指出用于构建项目的 jar 的存储目录。 Spark 提交选项。

也许这不是最好的选择,但它有效......

此外,更新版本时请注意该文件夹中的 jar 也必须更新。

关于java - 无法运行 JAR - 使用 Java 进行 Spark Twitter Streaming,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58793581/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com