- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Flume-ng 抓取 90 秒的日志信息并将其放入 HDFS 中的文件中。我让水槽通过 exec 和 tail 查看日志文件,但是它每 5 秒创建一个文件,而不是每 90 秒创建一个文件。
我的flume.conf如下:
# example.conf: A single-node Flume configuration
# Name the components on this agent
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /home/cloudera/LogCreator/fortune_log.log
# Describe sink1
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://localhost/flume/logtest/
agent1.sinks.sink1.hdfs.filePrefix = LogCreateTest
# this parameter seems to be getting overridden
agent1.sinks.sink1.hdfs.rollInterval=90
agent1.sinks.sink1.hdfs.rollSize=0
agent1.sinks.sink1.hdfs.hdfs.rollCount = 0
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
13/01/03 09:43:02 INFO properties.PropertiesFileConfigurationProvider: Reloading configuration file:/etc/flume-ng/conf/flume.conf
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Processing:sink1
13/01/03 09:43:02 INFO conf.FlumeConfiguration: Added sinks: sink1 Agent: agent1
13/01/03 09:43:03 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent1]
13/01/03 09:43:03 INFO properties.PropertiesFileConfigurationProvider: Creating channels
13/01/03 09:43:03 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: CHANNEL, name: channel1, registered successfully.
13/01/03 09:43:03 INFO properties.PropertiesFileConfigurationProvider: created channel channel1
13/01/03 09:43:03 INFO sink.DefaultSinkFactory: Creating instance of sink: sink1, type: hdfs
13/01/03 09:43:03 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
13/01/03 09:43:03 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: SINK, name: sink1, registered successfully.
13/01/03 09:43:03 INFO nodemanager.DefaultLogicalNodeManager: Starting new configuration:{ sourceRunners:{source1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:source1,state:IDLE} }} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@1a50ca0c counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.MemoryChannel{name: channel1}} }
13/01/03 09:43:03 INFO nodemanager.DefaultLogicalNodeManager: Starting Channel channel1
13/01/03 09:43:03 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel1 started
13/01/03 09:43:03 INFO nodemanager.DefaultLogicalNodeManager: Starting Sink sink1
13/01/03 09:43:03 INFO nodemanager.DefaultLogicalNodeManager: Starting Source source1
13/01/03 09:43:03 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started
13/01/03 09:43:03 INFO source.ExecSource: Exec source starting with command:tail -f /home/cloudera/LogCreator/fortune_log.log
13/01/03 09:43:07 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186506.tmp
13/01/03 09:43:08 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186506.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186506
13/01/03 09:43:08 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186507.tmp
13/01/03 09:43:12 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186507.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186507
13/01/03 09:43:12 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186508.tmp
13/01/03 09:43:12 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186508.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186508
13/01/03 09:43:12 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186509.tmp
13/01/03 09:43:18 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186509.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186509
13/01/03 09:43:18 INFO hdfs.BucketWriter: Creating hdfs://localhost/flume/logtest//LogCreateTest.1357224186510.tmp
13/01/03 09:43:18 INFO hdfs.BucketWriter: Renaming hdfs://localhost/flume/logtest/LogCreateTest.1357224186510.tmp to hdfs://localhost/flume/logtest/LogCreateTest.1357224186510
最佳答案
重写配置文件,指定更完整的参数选择。此示例将在 10k 条记录或 10 分钟后写入,以先到者为准。此外,我从内存 channel 更改为文件 channel ,以帮助提高数据流的可靠性。
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /home/cloudera/LogCreator/fortune_log.log
# Describe sink1
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://localhost/flume/logtest/
agent1.sinks.sink1.hdfs.filePrefix = LogCreateTest
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
agent1.sinks.sink1.hdfs.rollInterval = 600
# File size to trigger roll, in bytes (0: never roll based on file size)
agent1.sinks.sink1.hdfs.rollSize = 0
#Number of events written to file before it rolled (0 = never roll based on number of events)
agent1.sinks.sink1.hdfs.rollCount = 10000
# number of events written to file before it flushed to HDFS
agent1.sinks.sink1.hdfs.batchSize = 10000
agent1.sinks.sink1.hdfs.txnEventMax = 40000
# -- Compression codec. one of following : gzip, bzip2, lzo, snappy
# hdfs.codeC = gzip
#format: currently SequenceFile, DataStream or CompressedStream
#(1)DataStream will not compress output file and please don't set codeC
#(2)CompressedStream requires set hdfs.codeC with an available codeC
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.maxOpenFiles=50
# -- "Text" or "Writable"
#hdfs.writeFormat
agent1.sinks.sink1.hdfs.appendTimeout = 10000
agent1.sinks.sink1.hdfs.callTimeout = 10000
# Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
agent1.sinks.sink1.hdfs.threadsPoolSize=100
# Number of threads per HDFS sink for scheduling timed file rolling
agent1.sinks.sink1.hdfs.rollTimerPoolSize = 1
# hdfs.kerberosPrin--cipal Kerberos user principal for accessing secure HDFS
# hdfs.kerberosKey--tab Kerberos keytab for accessing secure HDFS
# hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
# hdfs.roundValue1 Rounded down to the highest multiple of this (in the unit configured using
# hdfs.roundUnit), less than current time.
# hdfs.roundUnit second The unit of the round down value - second, minute or hour.
# serializer TEXT Other possible options include AVRO_EVENT or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
# serializer.*
# Use a channel which buffers events to a file
# -- The component type name, needs to be FILE.
agent1.channels.channel1.type = FILE
# checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored
# dataDirs ~/.flume/file-channel/data The directory where log files will be stored
# The maximum size of transaction supported by the channel
agent1.channels.channel1.transactionCapacity = 1000000
# Amount of time (in millis) between checkpoints
agent1.channels.channel1.checkpointInterval 30000
# Max size (in bytes) of a single log file
agent1.channels.channel1.maxFileSize = 2146435071
# Maximum capacity of the channel
agent1.channels.channel1.capacity 10000000
#keep-alive 3 Amount of time (in sec) to wait for a put operation
#write-timeout 3 Amount of time (in sec) to wait for a write operation
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
关于hdfs - 在 Flume-ng 中使用 HDFS Sink 和 rollInterval 来批量处理 90 秒的日志信息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14141287/
我想用 Future 做一个异步工作。 但是下面的 .sink() 闭包 永远不会被调用。 看起来 Future 的实例在它被调用后就被释放了。 Future { promise in
我在我的 aspnet core 应用程序中使用 Serilog 进行日志记录。我需要频繁地将日志事件写入控制台(每秒 300-500 个事件)。我在 docker 容器内运行我的应用程序,并使用 O
为了查看在 foreach() 中运行的函数输出的控制台消息循环我遵循了 this guy 的建议并添加了一个 sink()像这样调用: library(foreach) libra
我正在使用 kafka connect 从 Mongodb -> Elasticsearch 移动数据。 目前,更新的记录作为新文档插入到 Elasticsearch 索引中。但是,我想根据 ID 更
我在 R 中处理以下数据: > str(df) 'data.frame': 369269 obs. of 12 variables: $ bkod : int 110006 110006 1
我正在使用 Tokio 玩异步/等待功能,并在我的 Cargo.toml 中启用了异步/等待功能。 (以及 2018 年版的最新 Rust nightly): tokio = { version =
这个问题在这里已经有了答案: URLSession.shared.dataTaskPublisher not working on IOS 13.3 (3 个回答) 1年前关闭。 这是我的管道: UR
我正在尝试使用flume来使用Twitter Stream API并将该tweet索引到我的elasticsearch中。我将flume.conf设置为使用com.cloudera.flume.sou
我正在尝试将数据从 kafka(最终我们将使用在不同实例上运行的 kafka)发送到 hdfs。我认为 Flume 或某种摄取协议(protocol)对于将数据导入 hdfs 是必要的。所以我们使用c
我怎样才能将输出重定向到某个 txt 文件,但以这样一种方式,以便我可以在逐步生成的同时在控制台中同时看到该输出? 最佳答案 只需使用 sink 的 split=TRUE 参数: sink(file=
我一直在疯狂地尝试让这一切发生,但我就是想不通(初学者)。 正如你所看到的,当你向下滚动时,顶部的头部部分会粘在页面的顶部,但也会溢出一点。这是用 stickyjs 完成的。我也想对头部底部做同样的事
我一直在使用 Serilog.Sinks.Email,我注意到(除了我没有收到电子邮件这一事实),当我执行 Log 语句时没有异常或任何表明失败的信息.即使我为 MailServer 放入垃圾邮件,L
两者的输出 pactl list sink-inputs和 pacmd list-sink-inputs包含一个属性部分: Properties: media.name = "ALSA Pla
我有一个函数f :: ByteString -> String ,并且需要一个 Sink ByteString (ResourceT IO) . 我怎么得到这个? 不幸的是,这些文档不是很有帮助...
我将 RxSwift 与以下类似的东西一起使用 extension Reactive where Base: UIViewController { public var showError:
如何在 Python Spark 结构化流中使用 foreach 来触发输出操作。 query = wordCounts\ .writeStream\ .outputMode('upd
我正在尝试将 R 脚本的错误和警告记录到外部文件中。同时我希望能够在 RStudio 的控制台中看到错误和警告(对开发和调试有用)。我正在尝试使用以下代码: logfile <- file("my_f
我正在使用 R 的 sink() 函数将错误、警告、消息和控制台输出捕获到单个文本文件中。 我想知道是否同时沉没两个 留言 和 输出 类型到单个打开的文件是不好的吗? 我将上述所有内容捕获到一个文件中
我正在测试Serilog.Sinks.Elasticsearch库和ELK堆栈(Elasticsearch&Kibana)的组合,以从我的ASP.NET Core 2.2应用程序中收集日志。 Web应
我基本上是从 Kafka 源读取数据,并将每条消息转储到我的 foreach 处理器(感谢 Jacek 页面提供的简单示例)。 如果这确实有效,我实际上应该在此处的 process 方法中执行一些业务
我是一名优秀的程序员,十分优秀!