gpt4 book ai didi

python - 数据流 Python SDK Avro 源/同步

转载 作者:太空宇宙 更新时间:2023-11-04 10:11:48 24 4
gpt4 key购买 nike

我希望使用 Python SDK 在 GCS 中摄取和写入 Avro 文件。 Avro 目前可以利用 Python SDK 实现这一点吗?如果是这样,我该怎么做?我在源代码中看到了关于此的 TODO 评论,所以我不太乐观。

最佳答案

从 Apache Beam/Dataflow Python SDK 2.6.0 版开始,确实可以在 GCS 中读取(和写入)avro 文件。

更好的是,适用于 Beam 的 Python SDK 现在支持 fastavro 读取和写入,比常规 avro IO 快 10 倍。

示例代码:

import apache_beam as beam
from apache_beam.io import ReadFromAvro
from apache_beam.io import WriteToAvro
import avro.schema


RUNNER = 'DataflowRunner'
GCP_PROJECT_ID = 'YOUR_PROJECT_ID'
BUCKET_NAME = 'YOUR_BUCKET_HERE'
STAGING_LOCATION = 'gs://{}/staging'.format(BUCKET_NAME)
TEMP_LOCATION = 'gs://{}/temp'.format(BUCKET_NAME)
GCS_INPUT = "gs://{}/input-*.avro".format(BUCKET_NAME)
GCS_OUTPUT = "gs://{}/".format(BUCKET_NAME)
JOB_NAME = 'conversion-test'

SCHEMA_PATH="YOUR_AVRO_SCHEMA.avsc"
AVRO_SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())

OPTIONS = {
'runner': RUNNER,
'job_name': JOB_NAME,
'staging_location': STAGING_LOCATION,
'temp_location': TEMP_LOCATION,
'project': GCP_PROJECT_ID,
'max_num_workers': 2,
'save_main_session': True,
}

PIPELINE = beam.Pipeline(options=beam.pipeline.PipelineOptions(flags=[], **OPTIONS))


def main():
# note: have to force `use_fastavro` to enable `fastavro`:
results = PIPELINE | ReadFromAvro(file_pattern=GCS_INPUT, use_fastavro=True)
results | WriteToAvro(file_path_prefix=GCS_OUTPUT, schema=AVRO_SCHEMA, use_fastavro=True)


if __name__ == '__main__':
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'PATH_TO_YOUR_SERVICE_ACCOUNT_KEY'
main()

关于python - 数据流 Python SDK Avro 源/同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37799065/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com