gpt4 book ai didi

database - 如何从 TFX BulkInferrer 获取数据帧或数据库写入?

转载 作者:行者123 更新时间:2023-12-04 11:39:57 29 4
gpt4 key购买 nike

我对 TFX 很陌生,但有一个明显有效的 ML 管道,可通过 BulkInferrer 使用。 .这似乎只以 Protobuf 格式生成输出,但由于我正在运行批量推理,我想将结果通过管道传输到数据库。 (DB 输出似乎应该是批量推理的默认值,因为批量推理和 DB 访问都利用了并行化......但 Protobuf 是一种每条记录的序列化格式。)
我想我可以使用类似 Parquet-Avro-Protobuf 的东西进行转换(尽管这是在 Java 中,而管道的其余部分在 Python 中),或者我可以自己编写一些东西来逐一使用所有 protobuf 消息,将它们转换为 JSON,将 JSON 反序列化为一个字典列表,然后将 dict 加载到 Pandas DataFrame 中,或者将其存储为一堆键值对,我将其视为一次性数据库......但这听起来像是很多工作和痛苦,涉及并行化和优化常见用例。顶层 Protobuf 消息定义是 Tensorflow 的 PredictionLog .
这一定是一个常见的用例,因为 TensorFlowModelAnalytics 的功能类似于 this one使用 Pandas 数据帧。我宁愿能够直接写入数据库(最好是 Google BigQuery)或 Parquet 文件(因为 Parquet/Spark 似乎比 Pandas 并行化得更好),而且,这些似乎应该是常见用例,但我没有找到任何例子。也许我使用了错误的搜索词?
我也看了PredictExtractor ,因为“提取预测”听起来接近我想要的......但官方文档似乎没有说明应该如何使用该类。我以为TFTransformOutput听起来像是一个有前途的动词,但实际上它是一个名词。
我显然在这里遗漏了一些基本的东西。有没有人想将 BulkInferrer 结果存储在数据库中的原因?是否有允许我将结果写入数据库的配置选项?也许我想添加一个 ParquetIOBigQueryIO实例到 TFX 管道? (TFX 文档说它使用 Beam“under the hood”,但这并没有说明我应该如何将它们一起使用。)但是这些文档中的语法看起来与我的 TFX 代码完全不同,我不确定它们是否重新兼容?
帮助?

最佳答案

(从相关问题复制以提高知名度)
经过一番挖掘,这里是另一种方法,它假设不知道 feature_spec预先。请执行下列操作:

  • 设置 BulkInferrer写信给 output_examples而不是 inference_result通过添加 output_example_spec到组件构建。
  • 添加 StatisticsGen和一个 SchemaGen BulkInferrer 之后的主管道中的组件为上述 output_examples 生成模式
  • 使用来自 SchemaGen 的工件和 BulkInferrer阅读TFRecords并做任何必要的事情。

  • bulk_inferrer = BulkInferrer(
    ....
    output_example_spec=bulk_inferrer_pb2.OutputExampleSpec(
    output_columns_spec=[bulk_inferrer_pb2.OutputColumnsSpec(
    predict_output=bulk_inferrer_pb2.PredictOutput(
    output_columns=[bulk_inferrer_pb2.PredictOutputCol(
    output_key='original_label_name',
    output_column='output_label_column_name', )]))]
    ))

    statistics = StatisticsGen(
    examples=bulk_inferrer.outputs.output_examples
    )

    schema = SchemaGen(
    statistics=statistics.outputs.output,
    )
    之后,您可以执行以下操作:
    import tensorflow as tf
    from tfx.utils import io_utils
    from tensorflow_transform.tf_metadata import schema_utils

    # read schema from SchemaGen
    schema_path = '/path/to/schemagen/schema.pbtxt'
    schema_proto = io_utils.SchemaReader().read(schema_path)
    spec = schema_utils.schema_as_feature_spec(schema_proto).feature_spec

    # read inferred results
    data_files = ['/path/to/bulkinferrer/output_examples/examples/examples-00000-of-00001.gz']
    dataset = tf.data.TFRecordDataset(data_files, compression_type='GZIP')

    # parse dataset with spec
    def parse(raw_record):
    return tf.io.parse_example(raw_record, spec)

    dataset = dataset.map(parse)
    在这一点上,数据集就像任何其他解析过的数据集一样,因此写入 CSV 或 BigQuery 表或从那里开始的任何内容都是微不足道的。它确实帮助了我们 ZenML与我们的 BatchInferencePipeline .

    关于database - 如何从 TFX BulkInferrer 获取数据帧或数据库写入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65525944/

    29 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com