gpt4 book ai didi

scala - Spark 流 : error in fileStream()

转载 作者:行者123 更新时间:2023-12-03 23:24:00 28 4
gpt4 key购买 nike

我正在尝试在 Scala 中实现 Spark 流应用程序。我想使用 fileStream() 方法来处理新到达的文件以及 hadoop 目录中存在的旧文件。

我遵循了来自 stackoverflow 的两个线程的 fileStream() 实现,如下所示:

  • Scala Spark streaming fileStream
  • spark streaming fileStream

  • 我使用 fileStream() 如下:
    val linesRDD = ssc.fileStream[LongWritable, Text, TextInputFormat](inputDirectory, (t: org.apache.hadoop.fs.Path) => true, false).map(_._2.toString)

    但我收到如下错误消息:
    type arguments [org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.Text,
    org.apache.hadoop.mapred.TextInputFormat] conform to the bounds of none of the overloaded alternatives of value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$12: scala.reflect.ClassTag[K], implicit evidence$13: scala.reflect.ClassTag[V], implicit evidence$14: scala.reflect.ClassTag[F])
    org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
    [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory:
    String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence$9: scala.reflect.ClassTag[K], implicit evidence$10: scala.reflect.ClassTag[V],
    implicit evidence$11: scala.reflect.ClassTag[F])
    org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$6: scala.reflect.ClassTag[K], implicit evidence$7: scala.reflect.ClassTag[V], implicit evidence$8: scala.reflect.ClassTag[F])
    org.apache.spark.streaming.dstream.InputDStream[(K, V)]

    wrong number of type parameters for overloaded method value fileStream with alternatives:
    [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$12: scala.reflect.ClassTag[K], implicit evidence$13: scala.reflect.ClassTag[V], implicit evidence$14: scala.reflect.ClassTag[F])
    org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path ⇒ Boolean, newFilesOnly: Boolean)(implicit evidence$9: scala.reflect.ClassTag[K], implicit evidence$10: scala.reflect.ClassTag[V], implicit evidence$11: scala.reflect.ClassTag[F])
    org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and>
    [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$6: scala.reflect.ClassTag[K], implicit evidence$7: scala.reflect.ClassTag[V], implicit evidence$8: scala.reflect.ClassTag[F])
    org.apache.spark.streaming.dstream.InputDStream[(K, V)]

    我正在使用 Spark 1.4.1 hadoop 2.7.1 .在发布这个问题之前,我已经查看了在 stackoverflow 上讨论的不同实现,并且还引发了文档,但没有任何帮助。任何帮助,将不胜感激。

    谢谢
    拉杰尼什。

    最佳答案

    请在下面的示例 java 代码中找到正确的导入,它对我来说工作正常

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;

    JavaStreamingContext jssc = SparkUtils.getStreamingContext("key", jsc);
    // JavaDStream<String> rawInput = jssc.textFileStream(inputPath);

    JavaPairInputDStream<LongWritable, Text> inputStream = jssc.fileStream(
    inputPath, LongWritable.class, Text.class,
    TextInputFormat.class, new Function<Path, Boolean>() {
    @Override
    public Boolean call(Path v1) throws Exception {
    if ( v1.getName().contains("COPYING") ) {
    // This eliminates staging files.
    return Boolean.FALSE;
    }
    return Boolean.TRUE;
    }
    }, true);
    JavaDStream<String> rawInput = inputStream.map(
    new Function<Tuple2<LongWritable, Text>, String>() {
    @Override
    public String call(Tuple2<LongWritable, Text> v1) throws Exception {
    return v1._2().toString();
    }
    });
    log.info(tracePrefix + "Created the stream, Window Interval: " + windowInterval + ", Slide interval: " + slideInterval);
    rawInput.print();

    关于scala - Spark 流 : error in fileStream(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33076339/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com