gpt4 book ai didi

java - Hadoop HDFS文件拆分成 block 的哪个Java文件

转载 作者:可可西里 更新时间:2023-11-01 15:30:15 28 4
gpt4 key购买 nike

众所周知,当一个文本文件从本地复制到 HDFS 时,该文件被分割成固定大小的 128 MB。例如,当我将一个 256 MB 的文本文件复制到 HDFS 时,将有 2 个 block (256/128) 包含“拆分”文件。

谁能告诉我 Hadoop 2.7.1 源代码中的哪个 java/jar 文件具有将文件拆分为 block 的功能,以及哪个 java/jar 文件将 block 写入数据节点的目录.

帮我追踪这段代码。

我只找到了在 FileInputFormat.java 中找到的对 block 进行逻辑输入拆分的那个,这不是我需要的。我需要用于拆分物理文件的 java 文件。

最佳答案

将数据写入 DataNode 的代码存在于 2 个文件中:

  • DFSOutputStream.java(包:org.apache.hadoop.hdfs)

    客户端写入的数据被分成数据包(通常为 64k 大小)。当一个数据包准备就绪时,数据被排入数据队列,由 DataStreamer 拾取。

  • DataStreamer(包:org.apache.hadoop.hdfs)

    它拾取数据队列中的数据包并将它们发送到管道中的数据节点(通常数据管道中有 3 个数据节点,因为复制因子为 3)。

    它检索一个新的 block ID 并开始将数据流式传输到数据节点。当写入一个数据 block 时,它会关闭当前 block 并获取一个新 block 来写入下一组数据包。

    获取新区 block 的代码如下:

    // get new block from namenode.
    if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
    if(LOG.isDebugEnabled()) {
    LOG.debug("Allocating new block");
    }
    setPipeline(nextBlockOutputStream());
    initDataStreaming();
    }

    关闭当前 block 的代码如下:

    // Is this block full?
    if (one.isLastPacketInBlock()) {
    // wait for the close packet has been acked
    synchronized (dataQueue) {
    while (!shouldStop() && ackQueue.size() != 0) {
    dataQueue.wait(1000);// wait for acks to arrive from datanodes
    }
    }
    if (shouldStop()) {
    continue;
    }

    endBlock();
    }

    endBlock() 方法中,阶段再次设置为:

    stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;

    这意味着,一个新的管道被创建用于将下一组数据包写入一个新的 block 。

编辑:如何检测 block 结束?

随着 DataStreamer 不断将数据附加到 block 中,它会更新写入的字节数。

/**
* increase bytes of current block by len.
*
* @param len how many bytes to increase to current block
*/
void incBytesCurBlock(long len) {
this.bytesCurBlock += len;
}

它还会不断检查写入的字节数是否等于 block 大小:

// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}

在上面的语句中,以下条件检查是否达到 block 大小:

getStreamer().getBytesCurBlock() == blockSize)

如果遇到 block 边界,则调用 endBlock() 方法:

/**
* if encountering a block boundary, send an empty packet to
* indicate the end of block and reset bytesCurBlock.
*
* @throws IOException
*/
protected void endBlock() throws IOException {
if (getStreamer().getBytesCurBlock() == blockSize) {
setCurrentPacketToEmpty();
enqueueCurrentPacket();
getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
}
}

这将确保当前 block 被关闭并从 Name Node 获取新 block 以写入数据。

block 大小由 hdfs-site.xml 文件中的 dfs.blocksize 参数决定(在我的集群中设置为 128 MB = 134217728):

<property>
<name>dfs.blocksize</name>
<value>134217728</value>
<description>The default block size for new files, in bytes.
You can use the following suffix (case insensitive): k(kilo),
m(mega), g(giga), t(tera), p(peta), e(exa) to specify the
size (such as 128k, 512m, 1g, etc.), Or provide complete size
in bytes (such as 134217728 for 128 MB).
</description>
</property>

关于java - Hadoop HDFS文件拆分成 block 的哪个Java文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34754356/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com