gpt4 book ai didi

apache-spark - 使用套接字的 Spark Structured Streaming,设置 SCHEMA,在控制台中显示 DATAFRAME

转载 作者:行者123 更新时间:2023-12-05 01:19:08 26 4
gpt4 key购买 nike

如何在 PySpark 中为流式传输 DataFrame 设置架构。

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *

spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
.readStream\
.format('socket')\
.option('host', '192.168.0.113')\
.option('port', 5560)\
.load()

例如我需要一个像这样的表:

Name,  lastName,   PhoneNumber    
Bob, Dylan, 123456
Jack, Ma, 789456
....

如何将 header /架构设置为 ['Name','lastName','PhoneNumber']与他们的数据类型。

此外,是否可以连续显示此表,或者说 DataFrame 的前 20 行。当我尝试它时,我得到了错误

"pyspark.sql.utils.AnalysisException: 'Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;\nProject"

最佳答案

TextSocketSource 不提供任何集成的解析选项。只能使用以下两种格式之一:

  • 时间戳和文本,如果 includeTimestamp 设置为 true,具有以下架构:

    StructType([
    StructField("value", StringType()),
    StructField("timestamp", TimestampType())
    ])
  • 仅当 includeTimestamp 设置为 false 且架构如下所示时才显示文本:

    StructType([StructField("value", StringType())]))

如果您想更改此格式,您必须转换流以提取感兴趣的字段,例如使用正则表达式:

from pyspark.sql.functions import regexp_extract
from functools import partial

fields = partial(
regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
)

lines.select(
fields(idx=1).alias("name"),
fields(idx=2).alias("last_name"),
fields(idx=3).alias("phone_number")
)

关于apache-spark - 使用套接字的 Spark Structured Streaming,设置 SCHEMA,在控制台中显示 DATAFRAME,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41378447/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com