gpt4 book ai didi

hadoop - 使用 HiveStorageHandler 的 Kafka 生产者

转载 作者:可可西里 更新时间:2023-11-01 16:35:40 26 4
gpt4 key购买 nike

我对 hive/hadoop 比较陌生

我正在阅读这个Hive Storage Handlers .

现在我正在尝试编写 HiveStorageHandler 的自定义实现,以使用 Hive 表查询消息并将消息推送到 Kafka。

我看到还有 HiveStorageHandler 的其他实现,它允许我们使用配置单元表在 NoSQL 数据库上查询和写入。

我正在尝试为 Kafka 复制它。我在上面找到了一个项目

HiveKa - query Kafka using Hive

在这里,他们试图使用配置单元表上的查询从 Kafka 读取数据。我希望使用 insert on the table 来写关于 kafka 的话题。

有人可以指导我吗?

最佳答案

I wish to write on the kafka topic using insert on the table.

这可以使用 Kafka HiveStorageHandler 实现。以下是此功能可能的一般用例

  1. 查询 Kafka 主题
  2. 从 Kafka 主题查询数据并插入到 hive 管理/外部表
  3. 从 Kafka 主题查询数据并推送到其他 Kafka 主题
  4. 从 hive 外部/托管表查询数据并推送到 Kafka 主题

您正在尝试执行第三个用例。

首先为源和目标 Kafka 主题创建两个外部表。

create external table if not exists source_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='source_topic_name',
'kafka.bootstrap.servers'=''
);


create external table if not exists target_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='target_topic_name',
'kafka.bootstrap.servers'=''
);

然后使用合并查询向目标Kafka主题中插入数据

merge into target_topic_table
using (
select
<columns>,
cast(null as binary) as `__key`,
cast(null as int) as `__partition`,
cast(-1 as bigint) as `__offset`,
cast(to_epoch_milli(current_timestamp) as bigint) as `__timestamp`
from source_topic_table
) sub
on
sub.column_name = target_topic_table.coulmn_name <Some condition>
when not matched then insert values
(
<sub.columns>,
sub.`__key`,sub.`__partition`,sub.`__offset`,sub.`__timestamp`
);

注意:

  1. 使用Hive外部非 native 表

  2. 除了用户定义的有效负载架构外,Kafka 存储处理程序还附加了 4 个额外的列(__key、__partition、__offset、__timestmap),用户可以使用这些列来查询 Kafka 元数据字段

  3. 如果数据不是 csv 格式,用户必须设置 'kafka.serde.class' 表属性

  4. 用户还可以设置“kafka.write.semantic”表属性,该属性允许 NONE、AT_LEAST_ONCE 或 EXACTLY_ONCE 值。

关于hadoop - 使用 HiveStorageHandler 的 Kafka 生产者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53465743/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com