gpt4 book ai didi

hadoop - 使用假脱机目录写入水槽如何重命名文件

转载 作者:可可西里 更新时间:2023-11-01 16:03:49 29 4
gpt4 key购买 nike

我正在使用 flume 假脱机目录写入 hdfs。这是我的代码

 #initialize agent's source, channel and sink
agent.sources = test
agent.channels = memoryChannel
agent.sinks = flumeHDFS

# Setting the source to spool directory where the file exists
agent.sources.test.type = spooldir
agent.sources.test.spoolDir = /johir
agent.sources.test.fileHeader = false
agent.sources.test.fileSuffix = .COMPLETED

# Setting the channel to memory
agent.channels.memoryChannel.type = memory
# Max number of events stored in the memory channel
agent.channels.memoryChannel.capacity = 10000
# agent.channels.memoryChannel.batchSize = 15000
agent.channels.memoryChannel.transactioncapacity = 1000000

# Setting the sink to HDFS
agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.path =/user/root/
agent.sinks.flumeHDFS.hdfs.fileType = DataStream

# Write format can be text or writable
agent.sinks.flumeHDFS.hdfs.writeFormat = Text

# use a single csv file at a time
agent.sinks.flumeHDFS.hdfs.maxOpenFiles = 1

# rollover file based on maximum size of 10 MB
agent.sinks.flumeHDFS.hdfs.rollCount=0
agent.sinks.flumeHDFS.hdfs.rollInterval=0
agent.sinks.flumeHDFS.hdfs.rollSize = 1000000
agent.sinks.flumeHDFS.hdfs.batchSize =1000

# never rollover based on the number of events
agent.sinks.flumeHDFS.hdfs.rollCount = 0

# rollover file based on max time of 1 min
#agent.sinks.flumeHDFS.hdfs.rollInterval = 0
# agent.sinks.flumeHDFS.hdfs.idleTimeout = 600

# Connect source and sink with channel
agent.sources.test.channels = memoryChannel
agent.sinks.flumeHDFS.channel = memoryChannel

但他的问题是写入文件的数据被重命名为某个随机的 tmp 名称。如何将 hdfs 中的文件重命名为源目录中的原始文件名。例如,我有文件 day1.txt、day2.txt、day3.txt。这些是两天的数据。我想将它们作为 day1.txt、day2.txt、day3.txt 存储在 hdfs 中。但这三个文件被合并并存储在 hdfs 中作为 FlumeData.1464629158164.tmp 文件。有什么办法吗?

最佳答案

如果您想保留原始文件名,您应该将文件名作为标题附加到每个事件。

  1. basenameHeader 属性设置为 true。这将使用 basename 键创建 header ,除非使用 basenameHeaderKey 属性设置为其他内容。
  2. 使用 hdfs.filePrefix 属性通过 basenameHeader 值设置文件名。

将以下属性添加到您的配置文件中。

#source properties
agent.sources.test.basenameHeader = true

#sink properties
agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.filePrefix = %{basename}

关于hadoop - 使用假脱机目录写入水槽如何重命名文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37531021/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com