- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我们有很多小文件需要合并。在 Scalding 中,您可以使用 TextLine
将文件读取为文本行。问题是我们每个文件有 1 个映射器,但我们想要组合多个文件,以便它们由 1 个映射器处理。
我知道我们需要将输入格式更改为 CombineFileInputFormat
的实现,这可能涉及使用级联 CombinedHfs
。我们不知道如何做到这一点,但它应该只是几行代码来定义我们自己的 Scalding 源,例如 CombineTextLine
。
非常感谢任何可以提供代码的人。
作为附带问题,我们在 s3 中有一些数据,如果给定的解决方案适用于 s3 文件,那就太好了——我想这取决于 CombineFileInputFormat
还是 CombinedHfs
适用于 s3。
最佳答案
您的问题中包含了想法,所以这可能是适合您的解决方案。
创建您自己的输入格式,扩展 CombineFileInputFormat 并使用您自己的自定义 RecordReader。我正在向您展示 Java 代码,但如果您愿意,您可以轻松地将其转换为 Scala。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> {
public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> {
private final RecordReader<LongWritable,Text> delegate;
public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException {
FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations());
delegate = new LineRecordReader(conf, fileSplit);
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
return delegate.next(key, value);
}
@Override
public LongWritable createKey() {
return delegate.createKey();
}
@Override
public Text createValue() {
return delegate.createValue();
}
@Override
public long getPos() throws IOException {
return delegate.getPos();
}
@Override
public void close() throws IOException {
delegate.close();
}
@Override
public float getProgress() throws IOException {
return delegate.getProgress();
}
}
@Override
public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
return new CombineFileRecordReader(job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class);
}
}
然后您需要扩展 TextLine 类并使其使用您刚刚定义的自己的输入格式(从现在开始使用 Scala 代码)。
import cascading.scheme.hadoop.TextLine
import cascading.flow.FlowProcess
import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}
import cascading.tap.Tap
import com.twitter.scalding.{FixedPathSource, TextLineScheme}
import cascading.scheme.Scheme
class CombineFileTextLine extends TextLine{
override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) {
super.sourceConfInit(flowProcess, tap, conf)
conf.setInputFormat(classOf[CombinedInputFormat[String, String]])
}
}
为您的组合输入创建方案。
trait CombineFileTextLineScheme extends TextLineScheme{
override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}
最后,创建您的源类:
case class CombineFileMultipleTextLine(p : String*) extends FixedPathSource(p :_*) with CombineFileTextLineScheme
如果您想使用单个路径而不是多个路径,对源类的更改是微不足道的。
希望对您有所帮助。
关于scala - 创建像 TextLine 这样的 Scalding Source,将多个文件组合成单个映射器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23917404/
不久前我收到了这个异常,我从堆栈跟踪中找不到任何有用的东西,Crashlytics 报告它发生在 Android 8 和 9 上 这是堆栈跟踪: Fatal Exception: java.lang.
我是 C++ 的新手,我该怎么做才能执行如下操作: getline(textfile, txtline); int i = 0; while (textline[i] != ' ') // Until
我们有很多小文件需要合并。在 Scalding 中,您可以使用 TextLine 将文件读取为文本行。问题是我们每个文件有 1 个映射器,但我们想要组合多个文件,以便它们由 1 个映射器处理。 我知道
我从这里的一位好人那里收到了这段代码,他们愿意花时间和精力与菜鸟分享他们的知识: Sub ReadLinesFromAFileOneAfterAnother () Const ForReading =
我是一名优秀的程序员,十分优秀!