gpt4 book ai didi

python - 如何使用 python 将字典写入数据流中的 Bigquery

转载 作者:太空宇宙 更新时间:2023-11-04 04:54:31 26 4
gpt4 key购买 nike

我正在尝试从 GCP 存储中读取 csv,将其转换为字典,然后写入 Bigquery 表,如下所示:

p | ReadFromText("gs://bucket/file.csv") 
| (beam.ParDo(BuildAdsRecordFn()))
| WriteToBigQuery('ads_table',dataset='dds',project='doubleclick-2',schema=ads_schema)

其中:'doubleclick-2' 和 'dds' 是现有的项目和数据集,ads_schema 定义如下:

ads_schema='Advertiser_ID:INTEGER,Campaign_ID:INTEGER,Ad_ID:INTEGER,Ad_Name:STRING,Click_through_URL:STRING,Ad_Type:STRING'

BuildAdsRecordFn() 定义如下:

class AdsRecord:
dict = {}

def __init__(self, line):
record = line.split(",")
self.dict['Advertiser_ID'] = record[0]
self.dict['Campaign_ID'] = record[1]
self.dict['Ad_ID'] = record[2]
self.dict['Ad_Name'] = record[3]
self.dict['Click_through_URL'] = record[4]
self.dict['Ad_Type'] = record[5]


class BuildAdsRecordFn(beam.DoFn):
def __init__(self):
super(BuildAdsRecordFn, self).__init__()

def process(self, element):
text_line = element.strip()
ads_record = AdsRecord(text_line).dict
return ads_record

但是,当我运行管道时,出现以下错误:

"dataflow_job_18146703755411620105-B" failed., (6c011965a92e74fa): BigQuery job "dataflow_job_18146703755411620105-B" in project "doubleclick-2" finished with error(s): errorResult: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON parsing error in row starting at position 0: Value encountered without start of object

这是我使用的示例测试数据:

100001,1000011,10000111,ut,https://bloomberg.com/aliquam/lacus/morbi.xml,Brand-neutral
100001,1000011,10000112,eu,http://weebly.com/sed/vel/enim/sit.jsp,Dynamic Click

我是数据流和 python 的新手,所以无法弄清楚上面的代码中可能有什么问题。非常感谢任何帮助!

最佳答案

我刚刚实现了您的代码,但效果不佳,但我收到了一条不同的消息错误(类似于“您无法返回 dict 作为 的结果ParDo").

这段代码对我来说正常工作,请注意不仅我没有使用类属性 dict 而且现在返回了一个列表:

ads_schema='Advertiser_ID:INTEGER,Campaign_ID:INTEGER,Ad_ID:INTEGER,Ad_Name:STRING,Click_through_URL:STRING,Ad_Type:STRING'

class BuildAdsRecordFn(beam.DoFn):
def __init__(self):
super(BuildAdsRecordFn, self).__init__()

def process(self, element):
text_line = element.strip()
ads_record = self.process_row(element)
return ads_record

def process_row(self, row):
dict_ = {}

record = row.split(",")
dict_['Advertiser_ID'] = int(record[0]) if record[0] else None
dict_['Campaign_ID'] = int(record[1]) if record[1] else None
dict_['Ad_ID'] = int(record[2]) if record[2] else None
dict_['Ad_Name'] = record[3]
dict_['Click_through_URL'] = record[4]
dict_['Ad_Type'] = record[5]
return [dict_]

with beam.Pipeline() as p:

(p | ReadFromText("gs://bucket/file.csv")
| beam.Filter(lambda x: x[0] != 'A')
| (beam.ParDo(BuildAdsRecordFn()))
| WriteToBigQuery('ads_table', dataset='dds',
project='doubleclick-2', schema=ads_schema))
#| WriteToText('test.csv'))

这是我模拟的数据:

Advertiser_ID,Campaign_ID,Ad_ID,Ad_Name,Click_through_URL,Ad_Type
1,1,1,name of ad,www.url.com,sales
1,1,2,name of ad2,www.url2.com,sales with sales

我还过滤掉了我在我的文件中创建的标题行(在 Filter 操作中),如果你没有标题那么这不是必需的

关于python - 如何使用 python 将字典写入数据流中的 Bigquery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47379190/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com