- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 AWS Glue 将 csv 数据从 AWS S3 移动到 AWS Redshift。我正在移动的数据使用非标准格式来记录每个条目的时间戳(例如 01-JAN-2020 01.02.03),因此我的胶水爬虫将此列作为字符串拾取。
在我的作业脚本中,我通过使用 pyspark 中的“to_timestamp”函数将此列转换为时间戳,这似乎工作正常。但是,因此,数据类型为“long”的列不会传输到 redshift,并且这些列的值都为空。
当我在不转换时间戳列(即仅生成的脚本)的情况下运行我的脚本时,数据类型为“long”的列没有这个问题,它们正确地出现在 redshift 中。
这是我的代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import to_timestamp, col
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "telenors3csvdata", table_name = "gprs_reports", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "telenors3csvdata", table_name = "gprs_reports", transformation_ctx = "datasource0")
# Convert to data frame and perform ETL
dataFrame = datasource0.toDF().withColumn("rec_open_ts", to_timestamp(col("rec_open_ts"),"dd-MMM-yyyy HH.mm.ss"))
# Convert back to a dynamic frame
editedData = DynamicFrame.fromDF(dataFrame, glueContext, "editedData")
## @type: ApplyMapping
## @args: [mapping = [("rec_open_ts", "timestamp", "rec_open_ts", "timestamp"), ("chg_id", "long", "chg_id", "long"), ("rec_seq_num", "long", "rec_seq_num", "long"), ("imsi", "long", "imsi", "long"), ("msisdn", "long", "msisdn", "long"), ("terminal_ip_address", "string", "terminal_ip_address", "string"), ("pdp_type", "long", "pdp_type", "long"), ("ggsn_ip_address", "string", "ggsn_ip_address", "string"), ("sgsn_ip_address", "string", "sgsn_ip_address", "string"), ("country", "string", "country", "string"), ("operator", "string", "operator", "string"), ("apn", "string", "apn", "string"), ("duration", "long", "duration", "long"), ("record_close_cause_code", "long", "record_close_cause_code", "long"), ("uploaded_data(b)", "long", "uploaded_data(b)", "long"), ("downloaded_data(b)", "long", "downloaded_data(b)", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = editedData, mappings = [("rec_open_ts", "timestamp", "rec_open_ts", "timestamp"), ("chg_id", "long", "chg_id", "long"), ("rec_seq_num", "long", "rec_seq_num", "long"), ("imsi", "long", "imsi", "long"), ("msisdn", "long", "msisdn", "long"), ("terminal_ip_address", "string", "terminal_ip_address", "string"), ("pdp_type", "long", "pdp_type", "long"), ("ggsn_ip_address", "string", "ggsn_ip_address", "string"), ("sgsn_ip_address", "string", "sgsn_ip_address", "string"), ("country", "string", "country", "string"), ("operator", "string", "operator", "string"), ("apn", "string", "apn", "string"), ("duration", "long", "duration", "long"), ("record_close_cause_code", "long", "record_close_cause_code", "long"), ("uploaded_data(b)", "long", "uploaded_data(b)", "long"), ("downloaded_data(b)", "long", "downloaded_data(b)", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "RedshiftCluster", connection_options = {"dbtable": "gprs_reports", "database": "telenordatasync"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "RedshiftCluster", connection_options = {"dbtable": "gprs_reports", "database": "telenordatasync"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()
我在这里明显遗漏了什么吗?非常感谢!
编辑:
运行 editedData.PrintSchema() 后显示的模式是:
|-- rec_open_ts: timestamp |-- chg_id: struct | |-- long: long | |-- string: string |-- rec_seq_num: struct | |-- long: long | |-- string: string |-- imsi: struct | |-- long: long | |-- string: string |-- msisdn: struct | |-- long: long | |-- string: string |-- terminal_ip_address: string |-- pdp_type: struct | |-- long: long | |-- string: string |-- ggsn_ip_address: string |-- sgsn_ip_address: string |-- country: string |-- operator: string |-- apn: string |-- duration: struct | |-- long: long | |-- string: string |-- record_close_cause_code: struct | |-- long: long | |-- string: string |-- uploaded_data(b): struct | |-- long: long | |-- string: string |-- downloaded_data(b): struct | |-- long: long | |-- string: string
(多头是结构的一部分?)
运行 editedData.Show(10) 后,显示应该存在于 redshift 中的数据。其中一个长列的示例:
"chg_id": {"long": 123456789, "string": null}
编辑 2:
在没有 ETL 的情况下运行 datasource0.printSchema() 后(时间戳保留为字符串),架构为:
|-- rec_open_ts: string |-- chg_id: choice | |-- long | |-- string |-- rec_seq_num: choice | |-- long | |-- string |-- imsi: choice | |-- long | |-- string |-- msisdn: choice | |-- long | |-- string |-- terminal_ip_address: string |-- pdp_type: choice | |-- long | |-- string |-- ggsn_ip_address: string |-- sgsn_ip_address: string |-- country: string |-- operator: string |-- apn: string |-- duration: choice | |-- long | |-- string |-- record_close_cause_code: choice | |-- long | |-- string |-- uploaded_data(b): choice | |-- long | |-- string |-- downloaded_data(b): choice | |-- long | |-- string
似乎当我转换时间戳列时,长列变成了结构。这是为什么?
最佳答案
对于遇到此问题的任何其他人,我已经找到了解决方案:
当一个类型不明确时(即在这种情况下,爬虫推断出一个 long 但该列中有一个值不是 long),该类型被标记为推断类型和字符串之间的选择。如果歧义没有解决,当从动态框架转换为数据框架时,这个选择会变成一个结构,并且不会在 redshift 中正确显示。
因此,在执行任何 ETL 之前,我使用“resolveChoice”方法解决了这些选择。这是我更新的代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import to_timestamp, col
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "telenors3csvdata", table_name = "gprs_reports", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "telenors3csvdata", table_name = "gprs_reports", transformation_ctx = "datasource0")
# Resolve type choices
resolvedData = datasource0.resolveChoice(specs = [('chg_id','cast:long')]).resolveChoice(specs = [('rec_seq_num','cast:long')]).resolveChoice(specs = [('imsi','cast:long')]).resolveChoice(specs = [('msisdn','cast:long')]).resolveChoice(specs = [('pdp_type','cast:long')]).resolveChoice(specs = [('duration','cast:long')]).resolveChoice(specs = [('record_close_cause_code','cast:long')]).resolveChoice(specs = [('uploaded_data(b)','cast:long')]).resolveChoice(specs = [('downloaded_data(b)','cast:long')])
# Convert to data frame and perform ETL
dataFrame = resolvedData.toDF().withColumn("rec_open_ts", to_timestamp(col("rec_open_ts"),"dd-MMM-yyyy HH.mm.ss"))
# Convert back to a dynamic frame
editedData = DynamicFrame.fromDF(dataFrame, glueContext, "editedData")
print("Printed Schema")
editedData.printSchema()
## @type: ApplyMapping
## @args: [mapping = [("rec_open_ts", "timestamp", "rec_open_ts", "timestamp"), ("chg_id", "long", "chg_id", "long"), ("rec_seq_num", "long", "rec_seq_num", "long"), ("imsi", "long", "imsi", "long"), ("msisdn", "long", "msisdn", "long"), ("terminal_ip_address", "string", "terminal_ip_address", "string"), ("pdp_type", "long", "pdp_type", "long"), ("ggsn_ip_address", "string", "ggsn_ip_address", "string"), ("sgsn_ip_address", "string", "sgsn_ip_address", "string"), ("country", "string", "country", "string"), ("operator", "string", "operator", "string"), ("apn", "string", "apn", "string"), ("duration", "long", "duration", "long"), ("record_close_cause_code", "long", "record_close_cause_code", "long"), ("uploaded_data(b)", "long", "uploaded_data(b)", "long"), ("downloaded_data(b)", "long", "downloaded_data(b)", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = editedData, mappings = [("rec_open_ts", "timestamp", "rec_open_ts", "timestamp"), ("chg_id", "long", "chg_id", "long"), ("rec_seq_num", "long", "rec_seq_num", "long"), ("imsi", "long", "imsi", "long"), ("msisdn", "long", "msisdn", "long"), ("terminal_ip_address", "string", "terminal_ip_address", "string"), ("pdp_type", "long", "pdp_type", "long"), ("ggsn_ip_address", "string", "ggsn_ip_address", "string"), ("sgsn_ip_address", "string", "sgsn_ip_address", "string"), ("country", "string", "country", "string"), ("operator", "string", "operator", "string"), ("apn", "string", "apn", "string"), ("duration", "long", "duration", "long"), ("record_close_cause_code", "long", "record_close_cause_code", "long"), ("uploaded_data(b)", "long", "uploaded_data(b)", "long"), ("downloaded_data(b)", "long", "downloaded_data(b)", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "RedshiftCluster", connection_options = {"dbtable": "gprs_reports", "database": "telenordatasync"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "RedshiftCluster", connection_options = {"dbtable": "gprs_reports", "database": "telenordatasync"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()
关于amazon-web-services - 在 AWS Glue 中转换其他列的数据类型时,某些列变为空,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62409328/
我正在 csv 上使用 hadoop 来分析一些数据。我使用sql/mysql(不确定)来分析数据,现在陷入了僵局。 我花了好几个小时在谷歌上搜索,却没有找到任何相关的东西。我需要一个查询,在该查询中
我正在为 Bootstrap 网格布局的“简单”任务而苦苦挣扎。我希望在大视口(viewport)上有 4 列,然后在中型设备上有 2 列,最后在较小的设备上只有 1 列。 当我测试我的代码片段时,似
对于这个令人困惑的标题,我深表歉意,我想不出这个问题的正确措辞。相反,我只会给你背景信息和目标: 这是在一个表中,一个人可能有也可能没有多行数据,这些行可能包含相同的 activity_id 值,也可
具有 3 列的数据库表 - A int , B int , C int 我的问题是: 如何使用 Sequelize 结果找到 A > B + C const countTasks = await Ta
我在通过以下功能编写此查询时遇到问题: 首先按第 2 列 DESC 排序,然后从“不同的第 1 列”中选择 只有 Column1 是 DISTINCT 此查询没有帮助,因为它首先从第 1 列中进行选择
使用 Bootstrap 非常有趣和有帮助,目前我在创建以下需求时遇到问题。 “使用 bootstrap 在桌面上有 4 列,在平板电脑上有 2 列,在移动设备上有 1 列”谁能告诉我正确的结构 最佳
我是 R 新手,正在问一个非常基本的问题。当然,我在尝试从所提供的示例中获取指导的同时做了功课here和 here ,但无法在我的案例中实现这个想法,即可能是由于我的问题中的比较维度更大。 我的实
通常我会使用 R 并执行 merge.by,但这个文件似乎太大了,部门中的任何一台计算机都无法处理它! (任何从事遗传学工作的人的附加信息)本质上,插补似乎删除了 snp ID 的 rs 数字,我只剩
我有一个 df , delta1 delta2 0 -1 2 0 -1 0 0 0 我想知道如何分配 delt
您好,我想知道是否可以执行以下操作。显然,我已经尝试在 phpMyAdmin 中运行它,但出现错误。也许还有另一种方式来编写此查询。 SELECT * FROM eat_eat_restaurants
我有 2 个列表(标题和数据值)。我想要将数据值列 1 匹配并替换为头文件列 1,以获得与 dataValue 列 1 和标题值列 2 匹配的值 头文件 TotalLoad,M0001001 Hois
我有两个不同长度的文件,file2 是一个很大的引用文件,我从中提取文件 1 的数据。 我有一行 awk,我通常会对其进行调整以在我的文件中进行查找和替换,但它总是在同一列中进行查找和替换。 所以对于
假设我有两个表,如下所示。 create table contract( c_ID number(1) primary key, c_name varchar2(50) not
我有一个带有 varchar 列的 H2 表,其检查约束定义如下: CONSTRAINT my_constraint CHECK (varchar_field <> '') 以下插入语句失败,但当我删
这是最少量的代码,可以清楚地说明我的问题: One Two Three 前 2 个 div 应该是 2 个左列。第三个应该占据页面的其余部分。最后,我将添加选项来隐藏和
在 Azure 中的 Log Analytics 中,我为 VM Heartbeat 选择一个预定义查询,我在编辑器中运行查询正常,但当我去创建警报时,我不断收到警报“查询未返回 TimeGenera
在 Azure 中的 Log Analytics 中,我为 VM Heartbeat 选择一个预定义查询,我在编辑器中运行查询正常,但当我去创建警报时,我不断收到警报“查询未返回 TimeGenera
今天我开始使用 JexcelApi 并遇到了这个:当您尝试从特定位置获取元素时,不是像您通常期望的那样使用sheet.getCell(row,col),而是使用sheet.getCell(col,ro
我有一个包含 28 列的数据库。第一列是代码,第二列是名称,其余是值。 public void displayData() { con.Open(); MySqlDataAdapter
我很沮丧:每当我缩小这个网页时,一切都变得一团糟。我如何将网页居中,以便我可以缩小并且元素不会被错误定位。 (它应该是 2 列,但所有内容都合并为 1)我试过 但由于某种原因,这不起作用。 www.o
我是一名优秀的程序员,十分优秀!