gpt4 book ai didi

java - Map Reduce程序将多个xml文件合并为一个xml文件

转载 作者:可可西里 更新时间:2023-11-01 16:07:21 27 4
gpt4 key购买 nike

我是 Hindsight 和 Hadoop map reduce 概念的新手。我正在尝试使用 map reduce 程序将多个 XML 文件合并为一个 XML 文件。我的目的是通过将文件名作为开始和结束标记添加和附加到文件名中,将每个 XML 文件合并到目标 XML 文件中。例如。以下 XML 应合并为如下所示的单个 XML

输入 XML 文件

<xml><a></a></xml>
<xml><b></b></xml>
<xml><c></c></xml>

输出 XML 文件

<xml>
<File1Name><xml><a></a></xml><File2Name>
<File2Name><xml><b></b></xml><File3Name>
<File3Name><xml><c></c></xml><File3Name>
<xml>

问题 1:是否可以将一个 XML 文件映射到每个映射器并创建一个键值对,键作为文件名,值作为每个 XML 文件的前缀和附加文件名作为开始和结束标记以及缩减器进行合并所有 XML 到单个上下文并输出到如上所示的 XML。

问题 2:如何在映射器代码中获取文件名作为键?

最佳答案

答案 1:我不建议只向映射器发送单个 XML,除非文件超过 1gb。您可以将 xml 位置列表发送到您的映射器,然后在您的映射器代码中打开每个位置并将数据提取到您的输出中。

答案 2:如果使用 azure blob 存储,您可以列出容器中的所有 blob,并将它们分配给输入拆分。

How to create your list of InputSplits:
ArrayList<InputSplit> ret = new ArrayList<InputSplit>();

/*Do this for each path we receive. Creates a directory of splits in this order s = input path (S1,1),(s2,1)…(sN,1),(s1,2),(sN,2),(sN,3) etc..
*/
for (int i = numMinNameHashSplits; i <= Math.min(numMaxNameHashSplits,numNameHashSplits–1); i++) {
for (Path inputPath : inputPaths) {
ret.add(new ParseDirectoryInputSplit(inputPath.toString(), i));
System.out.println(i + ” “+inputPath.toString());
}
}
return ret;
}
}

Once the List<InputSplits> is assembled, each InputSplit is handed to a Record Reader class where each Key, Value, pair is read then passed to the map task. The initialization of the recordreader class uses the InputSplit, a string representing the location of a “folder” of invoices in blob storage, to return a list of all blobs within the folder, the blobs variable below. The below Java code demonstrates the creation of the record reader for each hashslot and the resulting list of blobs in that location.

Public class ParseDirectoryFileNameRecordReader

extends RecordReader<IntWritable, Text> {
private int nameHashSlot;
private int numNameHashSlots;
private Path myDir;
private Path currentPath;
private Iterator<ListBlobItem> blobs;
private int currentLocation;

public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
myDir = ((ParseDirectoryInputSplit)split).getDirectoryPath();

//getNameHashSlot tells us which slot this record reader is responsible for
nameHashSlot = ((ParseDirectoryInputSplit)split).getNameHashSlot();

//gets the total number of hashslots
numNameHashSlots = getNumNameHashSplits(context.getConfiguration());

//gets the input credientals to the storage account assigned to this record reader.
String inputCreds = getInputCreds(context.getConfiguration());

//break the directory path to get account name
String[] authComponents = myDir.toUri().getAuthority().split(“@”);
String accountName = authComponents[1].split(“\\.”)[0];
String containerName = authComponents[0];
String accountKey = Utils.returnInputkey(inputCreds, accountName);
System.out.println(“This mapper is assigned the following account:”+accountName);
StorageCredentials creds = new StorageCredentialsAccountAndKey(accountName,accountKey);
CloudStorageAccount account = new CloudStorageAccount(creds);
CloudBlobClient client = account.createCloudBlobClient();
CloudBlobContainer container = client.getContainerReference(containerName);
blobs = container.listBlobs(myDir.toUri().getPath().substring(1) + “/”, true,EnumSet.noneOf(BlobListingDetails.class), null,null).iterator();
currentLocation = –1;
return;
}

Once initialized, the record reader is used to pass the next key to the map task. This is controlled by the nextKeyValue method, and it is called every time map task starts. The blow Java code demonstrates this.




//This checks if the next key value is assigned to this task or is assigned to another mapper. If it assigned to this task the location is passed to the mapper, otherwise return false
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
while (blobs.hasNext()) {
ListBlobItem currentBlob = blobs.next();

//Returns a number between 1 and number of hashslots. If it matches the number assigned to this Mapper and its length is greater than 0, return the path to the map function
if (doesBlobMatchNameHash(currentBlob) && getBlobLength(currentBlob) > 0) {
String[] pathComponents = currentBlob.getUri().getPath().split(“/”);

String pathWithoutContainer =
currentBlob.getUri().getPath().substring(pathComponents[1].length() + 1);

currentPath = new Path(myDir.toUri().getScheme(), myDir.toUri().getAuthority(),pathWithoutContainer);

currentLocation++;
return true;
}
}
return false;
}

The logic in the map function is than simply as follows, with inputStream containing the entire XML string

Path inputFile = new Path(value.toString());
FileSystem fs = inputFile.getFileSystem(context.getConfiguration());

//Input stream contains all data from the blob in the location provided by Text
FSDataInputStream inputStream = fs.open(inputFile);

资源:

http://www.andrewsmoll.com/3-hacks-for-hadoop-and-hdinsight-clusters/ “黑客 3”

http://blogs.msdn.com/b/mostlytrue/archive/2014/04/10/merging-small-files-on-hdinsight.aspx

关于java - Map Reduce程序将多个xml文件合并为一个xml文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34884505/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com