gpt4 book ai didi

python - 将数据帧从 Pandas 转换为 pyspark 到 Foundry 的数据类型

转载 作者:行者123 更新时间:2023-12-01 15:35:10 28 4
gpt4 key购买 nike

对于那些在 Foundry 环境中工作的人,我正在尝试在“代码存储库”中构建一个管道,以将原始数据集(来自 Excel 文件)处理成一个干净的数据集,稍后我将在“轮廓”中对其进行分析。
为此,我使用了 python,除了管道似乎正在使用 pyspark,并且在某些时候我必须将我用 Pandas 清理的数据集转换为 pyspark 数据集,这就是我被卡住的地方。

我已经查看了有关将 Pandas DF 转换为 Pyspark DF 的 stackover flow 的几篇文章,但到目前为止似乎没有一个解决方案有效。
当我尝试运行转换时,尽管我强制使用了模式,但始终无法转换数据类型。

Python代码部分已经在Spyder中测试成功(导入导出有Excel文件)并给出了预期的结果。只有当我需要转换为 pyspark 时,它才会以某种方式失败。

@transform_pandas(
Output("/MDM_OUT_OF_SERVICE_EVENTS_CLEAN"),
OOS_raw=Input("/MDM_OUT_OF_SERVICE_EVENTS"),
)
def DA_transform(OOS_raw):

''' Code Section in Python '''

mySchema=StructType([StructField(OOS_dup.columns[0], IntegerType(),
True),
StructField(OOS_dup.columns[1], StringType(), True),
...])

OOS_out=sqlContext.createDataFrame(OOS_dup,schema
=mySchema,verifySchema=False)

return OOS_out

我在某个时候收到此错误消息:
AttributeError: 'unicode' object has no attribute 'toordinal'.

根据这篇文章: What is causing 'unicode' object has no attribute 'toordinal' in pyspark?

这是因为 pyspark 无法将数据转换为日期类型

但数据在 Datetime64[ns]在 Pandas 。我已经尝试将此列转换为字符串和整数,但它也失败了。

这是 Python 输出数据集的图片:
enter image description here

这是清除数据集后 Pandas 返回的数据类型:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4972 entries, 0 to 4971
Data columns (total 51 columns):
OOS_ID 4972 non-null int64
OPERATOR_CODE 4972 non-null object
ATA_CAUSE 4972 non-null int64
EVENT_CODE 3122 non-null object
AC_MODEL 4972 non-null object
AC_SN 4972 non-null int64
OOS_DATE 4972 non-null datetime64[ns]
AIRPORT_CODE 4915 non-null object
RTS_DATE 4972 non-null datetime64[ns]
EVENT_TYPE 4972 non-null object
CORRECTIVE_ACTION 417 non-null object
DD_HOURS_OOS 4972 non-null float64
EVENT_DESCRIPTION 4972 non-null object
EVENT_CATEGORY 4972 non-null object
ATA_REPORTED 324 non-null float64
TOTAL_CAUSES 4875 non-null float64
EVENT_NUMBER 3117 non-null float64
RTS_TIME 4972 non-null object
OOS_TIME 4972 non-null object
PREV_REPORTED 4972 non-null object
FERRY_IND 4972 non-null object
REPAIR_STN_CODE 355 non-null object
MAINT_DOWN_TIME 4972 non-null float64
LOGBOOK_RECORD_IDENTIFIER 343 non-null object
RTS_IND 4972 non-null object
READY_FOR_USE 924 non-null object
DQ_COMMENTS 2 non-null object
REVIEWED 5 non-null object
DOES_NOT_MEET_SPECS 4 non-null object
CORRECTED 12 non-null object
EDITED_BY 4972 non-null object
EDIT_DATE 4972 non-null datetime64[ns]
OUTSTATION_INDICATOR 3801 non-null object
COMMENT_TEXT 11 non-null object
ATA_CAUSE_CHAPTER 4972 non-null int64
ATA_CAUSE_SECTION 4972 non-null int64
ATA_CAUSE_COMPONENT 770 non-null float64
PROCESSOR_COMMENTS 83 non-null object
PARTS_AVAIL_AT_STATION 4972 non-null object
PARTS_SHIPPED_AT_STATION 4972 non-null object
ENGINEER_AT_STATION 4972 non-null object
ENGINEER_SENT_AT_STATION 4972 non-null object
SOURCE_FILE 4972 non-null object
OOS_Month 4972 non-null float64
OOS_Hour 4972 non-null float64
OOS_Min 4972 non-null float64
RTS_Month 4972 non-null float64
RTS_Hour 4972 non-null float64
RTS_Min 4972 non-null float64
OOS_Timestamp 4972 non-null datetime64[ns]
RTS_Timestamp 4972 non-null datetime64[ns]
dtypes: datetime64[ns](5), float64(12), int64(5), object(29)

最佳答案

如果它可能对你们中的一些人有所帮助,我在官方 Foundry 文档中找到了有关如何在 pandas 和 pyspark DF 之间正确转换的文档。

OOS_dup 是我想转换回 Spark 的 Pandas 数据帧。

# Extract the name of each columns with its data type in pandas
col = OOS_dup.columns
col_type = list()

for c in col:
t = OOS_dup[c].dtype.name
col_type.append(t)

df_schema = pd.DataFrame({"field": col, "data_type": col_type})

# Define a function to replace missing (NaN sky coverage cells with Null
def replace_missing(df, col_names):
for col in col_names:
df = df.withColumn("{}".format(col),
F.when(df["{}".format(col)] == "NaN", None).otherwise(df["{}".format(col)]))
return df

# Replace missing values
OOS_dup = replace_missing(OOS_dup, col)

# Define a function to change column types to the proper type in spark
def change_type(df, col_names, dtypes):
for col in col_names:
df = df.withColumn("{}".format(col), F.when(dtypes == "float64", (df["{}".format(col)]).cast("double")).when(dtypes == "int64", (df["{}".format(col)]).cast("int")).when(dtypes == "datetime64[ns]", (df["{}".format(col)]).cast("date")).otherwise((df["{}".format(col)]).cast("string")))
return df

# Cast each columns to the proper data type
OOS_dup = change_type(OOS_dup, df_schema["field"], df_schema["data_type"])

OOS_dup = sqlContext.createDataFrame(OOS_dup)

关于python - 将数据帧从 Pandas 转换为 pyspark 到 Foundry 的数据类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57894967/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com