- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 Delta 表,我正在读取它作为 StreamingQuery。
使用 DESCRIBE History
查看增量表历史,我看到 99% 的 OperationMetrics 表明 numTargetRowsUpdates 为 0
,大多数操作都是插入。但是,偶尔会有 2-3 个 numTargetRowsUpdates > 1。Delta 表上的操作不过是合并。
我是否仍可以使用 StreamingQuery 并将此数据作为流读取,否则我会出错吗?。即:
df: DataFrame = spark \
.readStream \
.format("delta") \
.load(f"{table_location}") \
df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f "{checkpoint}/{table_location}")\
.trigger(once=True) \
.foreachBatch(process_batch) \
.start()
现在我有另一个 Delta 表,它更像是客户信息的维度表,即电子邮件、姓名、上次上线时间等。我最初将其作为附加的 StreamingQuery 阅读,但出现以下错误:java.lang.UnsupportedOperationException: Detected a data update
查看此表,在描述历史记录中,我看到发生了许多更新。问题:如果我将 StreamQuery 与 IgnoreChanges, True
一起使用,这是否会将更新的记录作为新记录发送,我可以在 foreachBatch 中进一步处理?
最佳答案
如果增量源中有更新或删除,读取流将抛出异常。这从 databricks documentation: 中也很清楚。
Structured Streaming does not handle input that is not an append andthrows an exception if any modifications occur on the table being usedas a source.
如果您使用 IgnoreChanges, True
,它不会抛出异常,但会为您提供更新的行 + 可能已经处理过的行。这是因为增量表中的所有内容都发生在文件级别。例如,如果您更新文件中的一行(大致),将发生以下情况:
文档中也提到了这一点。
ignoreChanges: re-process updates if files had to be rewritten in thesource table due to a data changing operation such as UPDATE, MERGEINTO, DELETE (within partitions), or OVERWRITE. Unchanged rows maystill be emitted, therefore your downstream consumers should be ableto handle duplicates. ...
您必须决定这是否适合您的用例。如果您需要专门处理更新和删除数据 block 提供 Change Data Feed ,您可以在增量表上启用它。这会为您提供有关插入、追加和删除的行级详细信息(以一些额外的存储和 IO 为代价)。
关于pyspark - Databricks 中的 StreamingQuery 增量表 - 描述历史,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71866652/
我在数据框中有一列月份数字,想将其更改为月份名称,所以我使用了这个: df['monthName'] = df['monthNumber'].apply(lambda x: calendar.mont
Pyspark 中是否有一个 input() 函数,我可以通过它获取控制台输入。如果是,请详细说明一下。 如何在 PySpark 中编写以下代码: directory_change = input("
我们正在 pyspark 中构建数据摄取框架,并想知道处理数据类型异常的最佳方法是什么。基本上,我们希望有一个拒绝表来捕获所有未与架构确认的数据。 stringDf = sparkSession.cr
我正在开发基于一组 ORC 文件的 spark 数据框的 sql 查询。程序是这样的: from pyspark.sql import SparkSession spark_session = Spa
我有一个 Pyspark 数据框( 原始数据框 )具有以下数据(所有列都有 字符串 数据类型): id Value 1 103 2
我有一台配置了Redis和Maven的服务器 然后我执行以下sparkSession spark = pyspark .sql .SparkSession .builder .master('loca
从一些简短的测试来看,pyspark 数据帧的列删除功能似乎不区分大小写,例如。 from pyspark.sql import SparkSession from pyspark.sql.funct
我有: +---+-------+-------+ | id| var1| var2| +---+-------+-------+ | a|[1,2,3]|[1,2,3]| | b|[2,
从一些简短的测试来看,pyspark 数据帧的列删除功能似乎不区分大小写,例如。 from pyspark.sql import SparkSession from pyspark.sql.funct
我有一个带有多个数字列的 pyspark DF,我想为每一列根据每个变量计算该行的十分位数或其他分位数等级。 这对 Pandas 来说很简单,因为我们可以使用 qcut 函数为每个变量创建一个新列,如
我有以下使用 pyspark.ml 包进行线性回归的代码。但是,当模型适合时,我在最后一行收到此错误消息: IllegalArgumentException: u'requirement failed
我有一个由 | 分隔的平面文件(管道),没有引号字符。示例数据如下所示: SOME_NUMBER|SOME_MULTILINE_STRING|SOME_STRING 23|multiline text
给定如下模式: root |-- first_name: string |-- last_name: string |-- degrees: array | |-- element: struc
我有一个 pyspark 数据框如下(这只是一个简化的例子,我的实际数据框有数百列): col1,col2,......,col_with_fix_header 1,2,.......,3 4,5,.
我有一个数据框 +------+--------------------+-----------------+---- | id| titulo |tipo | formac
我从 Spark 数组“df_spark”开始: from pyspark.sql import SparkSession import pandas as pd import numpy as np
如何根据行号/行索引值删除 Pyspark 中的行值? 我是 Pyspark(和编码)的新手——我尝试编码一些东西,但它不起作用。 最佳答案 您不能删除特定的列,但您可以使用 filter 或其别名
我有一个循环生成多个因子表的输出并将列名存储在列表中: | id | f_1a | f_2a | |:---|:----:|:-----| |1 |1.2 |0.95 | |2 |0.7
我正在尝试将 hql 脚本转换为 pyspark。我正在努力如何在 groupby 子句之后的聚合中实现 case when 语句的总和。例如。 dataframe1 = dataframe0.gro
我想添加新的 2 列值服务 arr 第一个和第二个值 但我收到错误: Field name should be String Literal, but it's 0; production_targe
我是一名优秀的程序员,十分优秀!