- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试从 GCP 存储中读取 csv,将其转换为字典,然后写入 Bigquery 表,如下所示:
p | ReadFromText("gs://bucket/file.csv")
| (beam.ParDo(BuildAdsRecordFn()))
| WriteToBigQuery('ads_table',dataset='dds',project='doubleclick-2',schema=ads_schema)
其中:'doubleclick-2' 和 'dds' 是现有的项目和数据集,ads_schema 定义如下:
ads_schema='Advertiser_ID:INTEGER,Campaign_ID:INTEGER,Ad_ID:INTEGER,Ad_Name:STRING,Click_through_URL:STRING,Ad_Type:STRING'
BuildAdsRecordFn() 定义如下:
class AdsRecord:
dict = {}
def __init__(self, line):
record = line.split(",")
self.dict['Advertiser_ID'] = record[0]
self.dict['Campaign_ID'] = record[1]
self.dict['Ad_ID'] = record[2]
self.dict['Ad_Name'] = record[3]
self.dict['Click_through_URL'] = record[4]
self.dict['Ad_Type'] = record[5]
class BuildAdsRecordFn(beam.DoFn):
def __init__(self):
super(BuildAdsRecordFn, self).__init__()
def process(self, element):
text_line = element.strip()
ads_record = AdsRecord(text_line).dict
return ads_record
但是,当我运行管道时,出现以下错误:
"dataflow_job_18146703755411620105-B" failed., (6c011965a92e74fa): BigQuery job "dataflow_job_18146703755411620105-B" in project "doubleclick-2" finished with error(s): errorResult: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON parsing error in row starting at position 0: Value encountered without start of object
这是我使用的示例测试数据:
100001,1000011,10000111,ut,https://bloomberg.com/aliquam/lacus/morbi.xml,Brand-neutral
100001,1000011,10000112,eu,http://weebly.com/sed/vel/enim/sit.jsp,Dynamic Click
我是数据流和 python 的新手,所以无法弄清楚上面的代码中可能有什么问题。非常感谢任何帮助!
最佳答案
我刚刚实现了您的代码,但效果不佳,但我收到了一条不同的消息错误(类似于“您无法返回 dict
作为 的结果ParDo
").
这段代码对我来说正常工作,请注意不仅我没有使用类属性 dict
而且现在返回了一个列表:
ads_schema='Advertiser_ID:INTEGER,Campaign_ID:INTEGER,Ad_ID:INTEGER,Ad_Name:STRING,Click_through_URL:STRING,Ad_Type:STRING'
class BuildAdsRecordFn(beam.DoFn):
def __init__(self):
super(BuildAdsRecordFn, self).__init__()
def process(self, element):
text_line = element.strip()
ads_record = self.process_row(element)
return ads_record
def process_row(self, row):
dict_ = {}
record = row.split(",")
dict_['Advertiser_ID'] = int(record[0]) if record[0] else None
dict_['Campaign_ID'] = int(record[1]) if record[1] else None
dict_['Ad_ID'] = int(record[2]) if record[2] else None
dict_['Ad_Name'] = record[3]
dict_['Click_through_URL'] = record[4]
dict_['Ad_Type'] = record[5]
return [dict_]
with beam.Pipeline() as p:
(p | ReadFromText("gs://bucket/file.csv")
| beam.Filter(lambda x: x[0] != 'A')
| (beam.ParDo(BuildAdsRecordFn()))
| WriteToBigQuery('ads_table', dataset='dds',
project='doubleclick-2', schema=ads_schema))
#| WriteToText('test.csv'))
这是我模拟的数据:
Advertiser_ID,Campaign_ID,Ad_ID,Ad_Name,Click_through_URL,Ad_Type
1,1,1,name of ad,www.url.com,sales
1,1,2,name of ad2,www.url2.com,sales with sales
我还过滤掉了我在我的文件中创建的标题行(在 Filter
操作中),如果你没有标题那么这不是必需的
关于python - 如何使用 python 将字典写入数据流中的 Bigquery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47379190/
我只想国家和资本化的值(value)。 这是我的完整代码: cities = { 'rotterdam': { 'country': 'netherlands',
想更好地了解如何比较对象类型的键。 dicOverall.exists(dic2) 返回 False,而 dicOverall.exists(dic1) 返回 True。我不太确定 .Exists 如
我是编程和 python 的新手,我不知道如何解决这个问题。 my_dict = {'tiger': ['claws', 'sharp teeth', 'four legs', 'stripes'
这个问题已经有答案了: Accessing an object property with a dynamically-computed name (19 个回答) 已关闭 8 年前。 我引用了这篇文
希望有人能帮忙。我正在使用 Python,我希望能够执行以下操作。 我有一组对象(例如形状)和一系列作用于这些对象的命令。命令的格式为命令字符串,后跟可变数量的参数,可以是字符串或整数 例如形状“矩形
我在文件中保存了一本字典。我从 python 交互式 shell 将字典加载到内存中,我的系统监视器显示 python 进程消耗了 4GB。以下命令提供以下输出: size1 = sys.getsiz
如果我运行以下代码: import json foo = [ { "name": "Bob", "occupation": "", "stand
我尝试获取列名及其索引,并将结果保存为数据框或字典: df <- data.frame(a=rnorm(10), b=rnorm(10), c=rnorm(10)) 我该怎么做?谢谢。 column
我正在尝试获取输入,如果字典 logins 有一个与我的输入匹配的键,我想返回该键的值。 logins = { 'admin':'admin', 'turtle':'password1
在 Perl 世界中有一个很棒的东西叫做 CPAN .它是开源 Perl 库的大型存储。 我使用来自 CPAN 的模块,我已经发布了 several distributions myself . 我使
这个问题已经有答案了: Is there a Python dict without values? (3 个回答) 已关闭 3 年前。 我有一个问题,我想跟踪大量值。如果我从未遇到过该值,我将执行操
想知道这是否可能。 我们有一个第 3 方库,其中包含有关用户的识别信息... 与库的主要交互是通过一个以字符串为键的 HashTable,并返回该键的信息对象图。 问题是, key 显然是区分大小写的
我是 .NET 编程的新手。对不起,如果这个问题以前被问过。 我目前正在学习 F#。 Dictionary、Hashtable 和 Map 之间有什么区别?我应该什么时候使用? 我还有一个标题中没有提
我正在尝试使用SVM进行3类分类。为此,我正在SVM培训期间准备词汇表。但是,由于我在SVM预测期间获得随机结果,因此我怀疑我的词汇创建方法中存在一些问题。我创建词汇的代码如下: //Mat trai
就目前情况而言,这个问题不太适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、民意调查或扩展讨论。如果您觉得这个问题可以改进并可能重新开放,visit
假设我有一个以下形式的嵌套字典: {'geo': {'bgcolor': 'white','lakecolor': 'white','caxis': {'gridcolor': 'white', 'l
我有一个 java 应用程序,每秒启动和停止数亿个项目(从外部脚本调用)多次。 Input: String key Output: int value 此应用程序的目的是在从未永远改变的Map(约30
我正在尝试找出字典与集合和数组相比的相对优势和功能。 我发现了一篇很棒的文章here但找不到一个简单的表格来比较所有不同的功能。 有人知道吗? 最佳答案 请参阅下表,对集合和字典进行有用的比较。 (该
我想要一个字典,它可以为字典中没有的任何键返回一个指定的值,例如: var dict = new DictWithDefValues("not specified"); dict.Add("bob78
我是 python 新手,目前仍在学习如何处理列表和字典。 我有这两个功能 def food_database(item_name, size_serv, calorie_serv, prot
我是一名优秀的程序员,十分优秀!