gpt4 book ai didi

java - 使用架构自动检测写入 BigQuery 的数据流作业

转载 作者:行者123 更新时间:2023-11-30 01:44:10 24 4
gpt4 key购买 nike

目前我们正在寻找将原始数据转换为通用结构以供进一步分析的最佳方法。我们的数据是 JSON 文件,有些文件的字段较多,有些文件的字段较少,有些可能有数组,但总的来说结构是一样的。

为此,我正在尝试用 Java 构建 Apache Beam 管道。我所有的管道都基于此模板:TextIOToBigQuery.java

第一种方法是将整个 JSON 作为字符串加载到一列中,然后使用 JSON Functions in Standard SQL转变为通用结构。这里有很好的描述:How to manage/handle schema changes while loading JSON file into BigQuery table

第二种方法是将数据加载到适当的列中。现在可以通过标准 SQL 查询数据。它还需要了解架构。可以通过控制台、UI 和其他方式检测到它:Using schema auto-detection ,但是我没有找到任何关于如何通过 Java 和 Apache Beam 管道实现这一点的信息。

我分析了BigQueryIO看起来如果没有模式它就无法工作(有一个异常(exception),如果表已经创建)

正如我之前提到的,新文件可能会带来新字段,因此架构应该相应更新。

假设我有三个 JSON 文件:

1. { "field1": "value1" }
2. { "field2": "value2" }
3. { "field1": "value3", "field10": "value10" }

第一个创建新表,其中包含一个字符串类型的字段“field1”。所以我的表格应该如下所示:

|field1  |
----------
|"value1"|

第二个执行相同的操作,但添加新字段“field2”。现在我的表格应该如下所示:

|field1  |field2  |
-------------------
|"value1"|null |
-------------------
|null |"value2"|

第三个 JSON 应该将另一个字段“field10”添加到架构中,依此类推。真实的 JSON 文件可能有 200 个或更多字段。处理这种情况有多困难?

哪种方式更好地进行这种转换?

最佳答案

我做了一些测试,模拟典型的自动检测模式:首先我运行所有数据以构建所有可能字段和类型的 Map (这里我只考虑 为了简单起见,字符串整数)。我用stateful管道来跟踪已经看到的字段并将其保存为 PCollectionView。这样我就可以使用.withSchemaFromView()因为在管道 build 中模式是未知的。请注意,此方法仅对批处理作业有效。

首先,我创建一些没有严格模式的虚拟数据,其中每行可能包含也可能不包含任何字段:

PCollection<KV<Integer, String>> input = p
.apply("Create data", Create.of(
KV.of(1, "{\"user\":\"Alice\",\"age\":\"22\",\"country\":\"Denmark\"}"),
KV.of(1, "{\"income\":\"1500\",\"blood\":\"A+\"}"),
KV.of(1, "{\"food\":\"pineapple pizza\",\"age\":\"44\"}"),
KV.of(1, "{\"user\":\"Bob\",\"movie\":\"Inception\",\"income\":\"1350\"}"))
);

我们将读取输入数据并构建我们在数据中看到的不同字段名称的Map,并进行基本类型检查以确定它是否包含INTEGER > 或 STRING。当然,如果需要的话,这可以延长。请注意,之前创建的所有数据都分配给相同的键,以便将它们分组在一起,我们可以构建完整的字段列表,但这可能是性能瓶颈。我们具体化输出,以便可以将其用作辅助输入:

PCollectionView<Map<String, String>> schemaSideInput = input  
.apply("Build schema", ParDo.of(new DoFn<KV<Integer, String>, KV<String, String>>() {

// A map containing field-type pairs
@StateId("schema")
private final StateSpec<ValueState<Map<String, String>>> schemaSpec =
StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));

@ProcessElement
public void processElement(ProcessContext c,
@StateId("schema") ValueState<Map<String, String>> schemaSpec) {
JSONObject message = new JSONObject(c.element().getValue());
Map<String, String> current = firstNonNull(schemaSpec.read(), new HashMap<String, String>());

// iterate through fields
message.keySet().forEach(key ->
{
Object value = message.get(key);

if (!current.containsKey(key)) {
String type = "STRING";

try {
Integer.parseInt(value.toString());
type = "INTEGER";
}
catch(Exception e) {}

// uncomment if debugging is needed
// LOG.info("key: "+ key + " value: " + value + " type: " + type);

c.output(KV.of(key, type));
current.put(key, type);
schemaSpec.write(current);
}
});
}
})).apply("Save as Map", View.asMap());

现在我们可以使用之前的 Map 构建包含 BigQuery 表架构的 PCollectionView:

PCollectionView<Map<String, String>> schemaView = p
.apply("Start", Create.of("Start"))
.apply("Create Schema", ParDo.of(new DoFn<String, Map<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<String, String> schemaFields = c.sideInput(schemaSideInput);
List<TableFieldSchema> fields = new ArrayList<>();

for (Map.Entry<String, String> field : schemaFields.entrySet())
{
fields.add(new TableFieldSchema().setName(field.getKey()).setType(field.getValue()));
// LOG.info("key: "+ field.getKey() + " type: " + field.getValue());
}

TableSchema schema = new TableSchema().setFields(fields);

String jsonSchema;
try {
jsonSchema = Transport.getJsonFactory().toString(schema);
} catch (IOException e) {
throw new RuntimeException(e);
}

c.output(ImmutableMap.of("PROJECT_ID:DATASET_NAME.dynamic_bq_schema", jsonSchema));

}}).withSideInputs(schemaSideInput))
.apply("Save as Singleton", View.asSingleton());

相应地更改完全限定表名称PROJECT_ID:DATASET_NAME.dynamic_bq_schema

最后,在管道中,我们读取数据,将其转换为 TableRow 并使用 .withSchemaFromView(schemaView) 将其写入 BigQuery:

input
.apply("Convert to TableRow", ParDo.of(new DoFn<KV<Integer, String>, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
JSONObject message = new JSONObject(c.element().getValue());
TableRow row = new TableRow();

message.keySet().forEach(key ->
{
Object value = message.get(key);
row.set(key, value);
});

c.output(row);
}}))
.apply( BigQueryIO.writeTableRows()
.to("PROJECT_ID:DATASET_NAME.dynamic_bq_schema")
.withSchemaFromView(schemaView)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

完整代码here .

管道创建的 BigQuery 表架构:

enter image description here

以及由此产生的稀疏数据:

enter image description here

关于java - 使用架构自动检测写入 BigQuery 的数据流作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58794005/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com