gpt4 book ai didi

hadoop - Camus 的预期提交/回滚行为是什么?

转载 作者:可可西里 更新时间:2023-11-01 16:01:44 30 4
gpt4 key购买 nike

我们已经运行 Camus 大约一年,成功地从 Kafka(版本 0.82)中提取 avro 有效载荷,并在 HDFS 中存储为 .avro 文件,仅使用几个 Kafka 主题。最近,我们公司的一个新团队在我们的预生产环境中注册了大约 60 个新主题,并开始向这些主题发送数据。该团队在将数据路由到 kafka 主题时犯了一些错误,导致 Camus 将这些主题的有效负载反序列化为 avro 时出错。Camus 作业因超出“其他失败”错误阈值而失败。失败后在 Camus 中产生的行为令人惊讶,我想与其他开发人员核实一下,看看我们观察到的行为是否符合预期,或者我们的实现是否存在一些问题。

当 Camus 作业因超过“失败的其他”阈值而失败时,我们注意到了这种行为:1. 所有 mapper 任务都成功,因此 TaskAttempt 被允许提交 - 这意味着 Camus 写入的所有数据都被复制到最终的 HDFS 位置。2. CamusJob 在计算 % 错误率时抛出异常(这是在映射器提交之后),导致作业失败3. 因为工作失败(我认为),Kafka offsets 没有提前

我们遇到此行为的问题是我们的 Camus 作业设置为每 5 分钟运行一次。因此,我们每隔 5 分钟就会看到数据被提交到 HDFS,作业失败,并且 Kafka 偏移量没有更新 - 这意味着我们写入了重复的数据,直到我们注意到我们的磁盘已满。

我编写了一个确认结果的集成测试 - 它向一个主题提交了 10 条良好记录,并向同一主题提交了 10 条使用意外模式的记录,运行 Camus 作业仅将该主题列入白名单,我们可以看到10 条记录被写入 HDFS 并且 Kafka 偏移量没有提前。下面是该测试的日志片段,以及我们在运行作业时使用的属性。

感谢任何帮助 - 我不确定这是否是 Camus 的预期行为,或者我们的实现是否有问题,以及防止这种行为(重复数据)的最佳方法是什么。

谢谢~马特

用于测试的 CamusJob 属性:

etl.destination.path=/user/camus/kafka/data
etl.execution.base.path=/user/camus/kafka/workspace
etl.execution.history.path=/user/camus/kafka/history
dfs.default.classpath.dir=/user/camus/kafka/libs

etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder

camus.message.timestamp.format=yyyy-MM-dd HH:mm:ss Z
mapreduce.output.fileoutputformat.compress=false

mapred.map.tasks=15
kafka.max.pull.hrs=1
kafka.max.historical.days=3

kafka.whitelist.topics=advertising.edmunds.admax
log4j.configuration=true

kafka.client.name=camus
kafka.brokers=<kafka brokers>
max.decoder.exceptions.to.print=5
post.tracking.counts.to.kafka=true
monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry
etl.schema.registry.url=<schema repo url>
etl.run.tracking.post=false
kafka.monitor.time.granularity=10

etl.daily=daily
etl.ignore.schema.errors=false

etl.output.codec=deflate
etl.deflate.level=6
etl.default.timezone=America/Los_Angeles
mapred.output.compress=false
mapred.map.max.attempts=2

测试的日志片段,显示映射器成功后的提交行为以及由于超过“其他”阈值而导致的后续作业失败:

LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map

[Task] - Task:attempt_local866350146_0001_m_000000_0 is done. And is in the process of committing

[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map

[Task] - Task attempt_local866350146_0001_m_000000_0 is allowed to commit now

[EtlMultiOutputFormat] - work path: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0

[EtlMultiOutputFormat] - Destination base path: /user/camus/kafka/data

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.3.2.2.1467979200000.avro

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.7.8.8.1467979200000.avro

[Task] - Task 'attempt_local866350146_0001_m_000000_0' done.
[LocalJobRunner] - Finishing task: attempt_local866350146_0001_m_000000_0
[LocalJobRunner] - map task executor complete.
[Job] - map 100% reduce 0%
[Job] - Job job_local866350146_0001 completed successfully
[Job] - Counters: 23
File System Counters
FILE: Number of bytes read=117251
FILE: Number of bytes written=350942
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=10
Map output records=15
Input split bytes=793
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=13
Total committed heap usage (bytes)=251658240
com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
DECODE_SUCCESSFUL=10
SKIPPED_OTHER=10
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=5907
total
data-read=840
decode-time(ms)=123
event-count=20
mapper-time(ms)=58
request-time(ms)=12114
skip-old=0
[CamusJob] - Group: File System Counters
[CamusJob] - FILE: Number of bytes read: 117251
[CamusJob] - FILE: Number of bytes written: 350942
[CamusJob] - FILE: Number of read operations: 0
[CamusJob] - FILE: Number of large read operations: 0
[CamusJob] - FILE: Number of write operations: 0
[CamusJob] - Group: Map-Reduce Framework
[CamusJob] - Map input records: 10
[CamusJob] - Map output records: 15
[CamusJob] - Input split bytes: 793
[CamusJob] - Spilled Records: 0
[CamusJob] - Failed Shuffles: 0
[CamusJob] - Merged Map outputs: 0
[CamusJob] - GC time elapsed (ms): 13
[CamusJob] - Total committed heap usage (bytes): 251658240
[CamusJob] - Group: com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
[CamusJob] - DECODE_SUCCESSFUL: 10
[CamusJob] - SKIPPED_OTHER: 10
[CamusJob] - job failed: 50.0% messages skipped due to other, maximum allowed is 0.1%

最佳答案

我面临着一个非常相似的问题:我的 Kafka/Camus 管道已经运行了大约一年,但最近我在集成来自连接非常不稳定和频繁作业失败的远程代理的摄取时遇到了重复问题。

今天检查的时候Gobblin documentation ,我意识到 Camus sweeper是一个可能是我们正在寻找的工具。尝试将其集成到您的管道中。

我还认为在不久的将来迁移到 Gobblin(Camus 继任者)是个好主意。

关于hadoop - Camus 的预期提交/回滚行为是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38287017/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com