- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 delta lake 运行 Pyspark,但是当我尝试导入 delta 模块时,我得到一个 ModuleNotFoundError: No module named 'delta'
.这是在一台没有互联网连接的机器上,所以我不得不从 Maven 手动下载 delta-core jar。并将其放入 %SPARK_HOME%/jars
文件夹。
我的程序运行没有任何问题,我能够从 delta lake 写入和读取,所以我很高兴我得到了正确的 jar。但是当我尝试导入增量模块时 from delta.tables import *
我收到错误。
有关信息,我的代码是:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, FloatType, StructType, StructField
from pyspark.sql.functions import input_file_name
from Constants import Constants
if __name__ == "__main__":
constants = Constants()
spark = SparkSession.builder.master("local[*]")\
.appName("Delta Lake Testing")\
.getOrCreate()
# have to start spark session before importing: https://docs.delta.io/latest/quick-start.html#python
from delta.tables import *
# set logging level to limit output
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.session.timeZone", "UTC")
# push additional python files to the worker nodes
base_path = os.path.abspath(os.path.dirname(__file__))
spark.sparkContext.addPyFile(os.path.join(base_path, 'Constants.py'))
# start pipeline
schema = StructType([StructField("Timestamp", TimestampType(), False),\
StructField("ParamOne", FloatType(), False),\
StructField("ParamTwo", FloatType(), False),\
StructField("ParamThree", FloatType(), False)])
df = spark.readStream\
.option("header", "true")\
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")\
.schema(schema)\
.csv(constants.input_path)\
.withColumn("input_file_name", input_file_name())
df.writeStream\
.format("delta")\
.outputMode("append")\
.option("checkpointLocation", constants.checkpoint_location)\
.start("/tmp/bronze")
# await on stream
sqm = spark.streams
sqm.awaitAnyTermination()
这是使用 Spark v2.4.4 和 Python v3.6.1,作业是使用 spark-submit path/to/job.py
提交的
最佳答案
%pyspark
sc.addPyFile("**LOCATION_OF_DELTA_LAKE_JAR_FILE**")
from delta.tables import *
关于apache-spark - 导入 Pyspark Delta Lake 模块时找不到模块错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62326402/
我想使用 spark sql 在 Delta 表中添加一些列,但它显示如下错误: ALTER ADD COLUMNS does not support datasource table with ty
在增量表中存储我的数据帧时,为我的数据帧寻找有效的分区策略。 我当前的数据帧 1.5000.000 rowa 将数据从数据帧移动到增量表需要 3.5 小时。 为了寻找更有效的写作方式,我决定尝试将我的
我想知道,是否可以更新增量表分区列的“值”? 该表按特定列分区,现在我想更新该特定列的值。我可以这样做吗? (在 slack 上找到) 最佳答案 使用 replaceWhere 选项。 引用官方文档
考虑排序数组a: a = np.array([0, 2, 3, 4, 5, 10, 11, 11, 14, 19, 20, 20]) 如果我指定左右增量, delta_left, delta_righ
当我们运行 VACUUM 命令时,它是遍历每个 parquet 文件并删除每条记录的旧版本,还是保留所有 parquet 文件,即使它有一个最新版本的记录?压实呢?这有什么不同吗? 最佳答案 Vacu
如果我想使用 delta time-travel 来比较两个版本以获得类似于 CDC 的更改,该怎么做? 我可以看到两个选项: 在 SQL 中,您有 EXCEPT/MINUS 查询,您可以将所有数据与
我想在 python 中对给定的输入和输出数据进行敏感性分析。输入参数的设计是基于拉丁超立方体的,所以我决定使用SALib的delta模块。我找不到一些文档,返回参数 delta、delta_conf
我正在尝试在 CUDA 中实现前馈神经网络。到目前为止,我用过 Jeff Heaton's YouTube videos作为推断算法和实现它们的指南。我不清楚一件事: 希顿在他的 Gradient C
我正在阅读下面关于 First Search Program - Artificial Intelligence for Robotics 的代码,我对下面这两行的工作稍作停留: x2 = x+del
我将一年以上的行作为增量表归档到 ADLSv2 中,当需要报告该数据时,我需要将归档数据与本地数据库中现有的一些表连接起来。有没有一种方法可以在不从云中重新水化或将数据水化到云的情况下进行连接? 最佳
Delta Lake 在哪里存储表元数据信息。我在我的独立机器上使用 spark 2.6(不是 Databricks)。我的假设是,如果我重新启动 spark,将删除在 delta lake spar
我按照@提到的步骤操作:http://wiki.apache.org/solr/DataImportHandler 我还尝试了来自 stackoverflow 的其他解决方案,但仍然无法正常工作。 问
是否可以在本地实现三角洲湖?如果是,需要安装什么软件/工具? 我正在尝试在内部实现一个 delta 湖来分析一些日志文件和数据库表。我当前的机器装有 ubuntu,apache spark。不确定还需
Delta Lake 每 10 个版本自动创建一个检查点。有没有办法手动创建检查点? 最佳答案 import org.apache.spark.sql.delta.DeltaLog DeltaLog.
虽然分析似乎无法避免存储到“delta”的值不被读取...我的循环的哪一部分不起作用,为什么? #include #include int main() { float a, b, c;
不幸的是,我认为错误并不是让他自动更新了delta 我在“数据库”中有这个表插件 # in MySQL CREATE TABLE sph_counter ( counter_id INTEGER PR
是否可以使用 Delta Live Tables 来执行增量批处理? 现在,我相信这段代码将始终在运行管道时加载目录中的所有可用数据, CREATE LIVE TABLE lendingclub_ra
我有一个包含数百万行和多个不同类型的列的增量表,包括。嵌套结构。我想在运行时创建增量表的空 DataFrame 克隆 - 即相同的模式,没有行。 我可以读取架构而不读取表的任何内容吗(这样我就可以基于
我有一些历史期权价格,我正在尝试确定隐含的 delta。 我有: 1) strike 2) call/put 3) stock price 4) dividend 5) interest rate 6
梯度下降和 delta 规则有什么区别? 最佳答案 没有数学:delta 规则使用梯度下降来最小化感知器网络权重的误差。 梯度下降是一种通用算法,它逐渐改变参数向量以最小化目标函数。它通过向阻力最小的
我是一名优秀的程序员,十分优秀!