gpt4 book ai didi

google-cloud-dataflow - 在 Google Dataflow/Apache Beam 中读取嵌套的 JSON

转载 作者:行者123 更新时间:2023-12-01 23:08:15 25 4
gpt4 key购买 nike

可以通过以下方式使用 Dataflow 读取 Cloud Storage 上未嵌套的 JSON 文件:

p.apply("read logfiles", TextIO.Read.from("gs://bucket/*").withCoder(TableRowJsonCoder.of()));

如果我只想将这些日志以最少的过滤写入 BigQuery,我可以使用这样的 DoFn:
private static class Formatter extends DoFn<TableRow,TableRow> {

@Override
public void processElement(ProcessContext c) throws Exception {

// .clone() since input is immutable
TableRow output = c.element().clone();

// remove misleading timestamp field
output.remove("@timestamp");

// set timestamp field by using the element's timestamp
output.set("timestamp", c.timestamp().toString());

c.output(output);
}
}
}

但是,我不知道如何以这种方式访问​​ JSON 文件中的嵌套字段。
  • 如果 TableRow 包含 RECORD命名 r , 是否可以在不进一步序列化/反序列化的情况下访问其键/值?
  • 如果我需要使用 Jackson 序列化/反序列化自己库,使用标准 Coder 是否更有意义?的 TextIO.Read而不是 TableRowJsonCoder ,以这种方式获得一些我失去的性能?

  • 编辑

    这些文件以换行符分隔,如下所示:
    {"@timestamp":"2015-x", "message":"bla", "r":{"analyzed":"blub", "query": {"where":"9999"}}}
    {"@timestamp":"2015-x", "message":"blub", "r":{"analyzed":"bla", "query": {"where":"1111"}}}

    最佳答案

    您最好的选择可能是按照您在 #2 中描述的方法并直接使用 Jackson。最有意义的是让 TextIO 读取执行它的构建目的——使用字符串编码器从文件中读取行——然后使用 DoFn实际解析元素。类似于以下内容:

    PCollection<String> lines = pipeline
    .apply(TextIO.from("gs://bucket/..."));
    PCollection<TableRow> objects = lines
    .apply(ParDo.of(new DoFn<String, TableRow>() {
    @Override
    public void processElement(ProcessContext c) {
    String json = c.element();
    SomeObject object = /* parse json using Jackson, etc. */;
    TableRow row = /* create a table row from object */;
    c.output(row);
    }
    });

    请注意,您也可以使用多个 ParDo 来执行此操作。

    关于google-cloud-dataflow - 在 Google Dataflow/Apache Beam 中读取嵌套的 JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41984229/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com