gpt4 book ai didi

apache-spark - PySpark 如何将 CSV 读入 Dataframe 并对其进行操作

转载 作者:行者123 更新时间:2023-12-03 09:24:37 26 4
gpt4 key购买 nike

我对 pyspark 很陌生,我试图用它来处理一个保存为 csv 文件的大型数据集。
我想将 CSV 文件读入 Spark 数据框,删除一些列,然后添加新列。
我该怎么做?

我无法将这些数据放入数据框中。这是我迄今为止所拥有的精简版本:

def make_dataframe(data_portion, schema, sql):
fields = data_portion.split(",")
return sql.createDateFrame([(fields[0], fields[1])], schema=schema)

if __name__ == "__main__":
sc = SparkContext(appName="Test")
sql = SQLContext(sc)

...

big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
.reduce(lambda a, b: a.union(b))

big_frame.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://<...>") \
.option("dbtable", "my_table_copy") \
.option("tempdir", "s3n://path/for/temp/data") \
.mode("append") \
.save()

sc.stop()

这会产生错误 TypeError: 'JavaPackage' object is not callable在减少步骤。

是否有可能做到这一点?减少到数据帧的想法是能够将结果数据写入数据库(Redshift,使用 spark-redshift 包)。

我也试过使用 unionAll() , 和 map()partial()但无法让它工作。

我正在亚马逊的 EMR 上运行它,使用 spark-redshift_2.10:2.0.0 ,以及亚马逊的 JDBC 驱动程序 RedshiftJDBC41-1.1.17.1017.jar .

最佳答案

更新 - 在评论中回答您的问题:

从 CSV 读取数据到数据框:
看来您只是尝试将 CSV 文件读入 Spark 数据帧。

如果是这样 - 我的回答是:https://stackoverflow.com/a/37640154/5088142覆盖这个。

以下代码应将 CSV 读入 spark-data-frame

import pyspark
sc = pyspark.SparkContext()
sql = SQLContext(sc)

df = (sql.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load("/path/to_csv.csv"))

// these lines are equivalent in Spark 2.0 - using [SparkSession][1]
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

spark.read.format("csv").option("header", "true").load("/path/to_csv.csv")
spark.read.option("header", "true").csv("/path/to_csv.csv")

下降列

您可以使用“drop(col)”删除列
https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

下降(列)
Returns a new DataFrame that drops the specified column.
Parameters: col – a string name of the column to drop, or a Column to drop.

>>> df.drop('age').collect()
[Row(name=u'Alice'), Row(name=u'Bob')]

>>> df.drop(df.age).collect()
[Row(name=u'Alice'), Row(name=u'Bob')]

>>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
[Row(age=5, height=85, name=u'Bob')]

>>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
[Row(age=5, name=u'Bob', height=85)]

添加栏目
您可以使用“withColumn”
https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

withColumn(colName, col)
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
Parameters:

colName – string, name of the new column.
col – a Column expression for the new column.

>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]

注意:spark 有很多其他可以使用的功能(例如,您可以使用“select”而不是“drop”)

关于apache-spark - PySpark 如何将 CSV 读入 Dataframe 并对其进行操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40327859/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com