gpt4 book ai didi

apache-spark - Pyspark:spark-submit 不像 CLI 那样工作

转载 作者:行者123 更新时间:2023-12-04 12:39:13 24 4
gpt4 key购买 nike

我有一个 pyspark 来从 TSV 文件加载数据并将其保存为 Parquet 文件以及将其保存为持久性 SQL 表。

当我通过 pyspark CLI 逐行运行它时,它的工作方式与预期完全一样。当我使用 spark-submit 作为应用程序运行它时,它运行时没有任何错误,但我得到了奇怪的结果:1. 数据被覆盖而不是附加。 2. 当我对它运行 SQL 查询时,即使 Parquet 文件有几 GB 大小(我所期望的),也没有返回任何数据。有什么建议?

代码:

from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import *

csv_file = '/srv/spark/data/input/ipfixminute2018-03-28.tsv'
parquet_dir = '/srv/spark/data/parquet/ipfixminute'

sc = SparkContext(appName='import-ipfixminute')
spark = SQLContext(sc)

fields = [StructField('time_stamp', TimestampType(), True),
StructField('subscriberId', StringType(), True),
StructField('sourceIPv4Address', StringType(), True),
StructField('destinationIPv4Address', StringType(), True),
StructField('service',StringType(), True),
StructField('baseService',StringType(), True),
StructField('serverHostname', StringType(), True),
StructField('rat', StringType(), True),
StructField('userAgent', StringType(), True),
StructField('accessPoint', StringType(), True),
StructField('station', StringType(), True),
StructField('device', StringType(), True),
StructField('contentCategories', StringType(), True),
StructField('incomingOctets', LongType(), True),
StructField('outgoingOctets', LongType(), True),
StructField('incomingShapingDrops', IntegerType(), True),
StructField('outgoingShapingDrops', IntegerType(), True),
StructField('qoeIncomingInternal', DoubleType(), True),
StructField('qoeIncomingExternal', DoubleType(), True),
StructField('qoeOutgoingInternal', DoubleType(), True),
StructField('qoeOutgoingExternal', DoubleType(), True),
StructField('incomingShapingLatency', DoubleType(), True),
StructField('outgoingShapingLatency', DoubleType(), True),
StructField('internalRtt', DoubleType(), True),
StructField('externalRtt', DoubleType(), True),
StructField('HttpUrl',StringType(), True)]

schema = StructType(fields)
df = spark.read.load(csv_file, format='csv',sep='\t',header=True,schema=schema,timestampFormat='yyyy-MM-dd HH:mm:ss')
df = df.drop('all')
df = df.withColumn('date',to_date('time_stamp'))
df.write.saveAsTable('test2',mode='append',partitionBy='date',path=parquet_dir)

最佳答案

正如@user8371915 建议的那样,它类似于:

Spark can access Hive table from pyspark but not from spark-submit

我需要更换

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)


from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

这解决了这个问题。

关于apache-spark - Pyspark:spark-submit 不像 CLI 那样工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50470847/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com