- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我想不断详细说明数据集流的行(最初由 Kafka 发起):基于我想更新 Radis 哈希的条件。这是我的代码片段(lastContacts
是前一个命令的结果,它是这种类型的流:org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long]
。这扩展为 org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
):
class MyStreamProcessor extends ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(record: Row) = {
val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)
}
override def close(errorOrNull: Throwable): Unit = {}
}
val query = lastContacts
.writeStream
.foreach(new MyStreamProcessor())
.start()
query.awaitTermination()
我收到一个巨大的堆栈跟踪,相关部分(我认为)是这样的:java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter
谁能解释为什么会出现这种异常以及如何避免?谢谢!
这个问题与以下两个有关:
最佳答案
Spark 上下文不可序列化。
ForeachWriter 的任何实现都必须是可序列化的,因为每个任务都将获得所提供对象的新序列化-反序列化副本。因此,强烈建议在调用 open(...) 方法后完成写入数据的任何初始化(例如打开连接或启动事务),这表示任务已准备好生成数据。
在您的代码中,您试图在 process 方法中使用 spark 上下文,
override def process(record: Row) = {
val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
*sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)*
}
要向redis发送数据,需要自己创建连接并在open方法中打开,然后在process方法中使用。
看看如何创建redis连接池。 https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
关于apache-spark - Azure DataBricks Stream foreach 因 NotSerializableException 而失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55362469/
我已经开始阅读 Databricks 推出的 Unity Catalog。我了解它试图解决的基本问题,但我不了解目录到底是什么。 这在 Databricks 文档中可用, A catalog cont
我正在努力了解 Databricks。 我发现文档逐步从 S3 或 Azure Datalake 导入数据,然后输出到 Azure Synapse Analytics 或其他数据仓库解决方案。 快速播
我想以编程方式将(Python Wheel)库添加到 /Shared Databricks 上的工作区。在 GUI(工作区 > 导入 > 库)中很容易做到,但我无法弄清楚如何在 Databricks
我正在创建一个带有公司 Logo 的 databricks 笔记本模板。使用以下代码显示图像会引发错误。 代码: %md 错误: HTTP ERROR 403: Invalid or missing
我将使用这张图片来形象化我的问题: Databricks1 在 Databricks 中创建数据库(和表)并将其数据存储在存储帐户中。在Databricks2中我想读取数据:Databricks2只有
有没有办法通过 python 笔记本确定现有的 Azure Databricks Secret Scope 是否由 Key Vault 或 Databricks 支持? dbutils.secrets
我正在尝试连接到 Databricks 上的 Spark 集群,并且正在学习本教程:https://docs.databricks.com/dev-tools/dbt.html .我安装了 dbt-d
我们可以使用Autoloader跟踪是否已从 S3 存储桶加载的文件。我关于 Autoloader 的问题:有没有办法读取 Autoloader 数据库以获取已加载文件的列表? 我可以在 AWS Gl
我们可以使用一些帮助来了解如何将 Spark Driver 和 worker 日志发送到 Azure Databricks 之外的目的地,例如Azure Blob 存储或使用 Eleastic-bea
将我的 Azure Databricks 从标准升级到主要,尝试开始使用 Databricks Delta: create table t using delta as select * from t
现在,databricks 自动加载器需要一个目录路径,从中加载所有文件。但是,如果其他类型的日志文件也开始进入该目录 - 有没有办法让 Autoloader 在准备数据帧时排除这些文件? df =
有人可以让我知道如何使用 databricks dbutils 从文件夹中删除所有文件。 我尝试了以下但不幸的是,Databricks 不支持通配符。 dbutils.fs.rm('adl://azu
我是 azure 的新手和databricks ,我学会了如何安装 blob 和利用,但我有一些疑问,而且我还没有找到任何文档的任何答案。所以请帮我解释一下: dbutils.fs.mount(
尝试遍历已安装的 Databricks 卷中的目录时遇到 ClassCastException。 java.lang.ClassCastException: com.databricks.backen
尝试遍历已安装的 Databricks 卷中的目录时遇到 ClassCastException。 java.lang.ClassCastException: com.databricks.backen
我正在运行 Databricks Community Edition,我想从以下 mnt 目录中删除文件 /mnt/driver-daemon/jars 我运行 dbutils 命令: dbutils
我已经在我的机器上创建了“.netrc”文件并尝试在 databricks rest api 调用下面。但它总是给出未经授权的错误。如何在 Databricks 中创建 .netrc 文件? curl
没有意识到 shift+enter 运行一个单元格。我正在写一个 delete from table 并按下 shift enter 删除了表中的所有数据。 最佳答案 在 Delta Lake 表中,
我需要访问 Azure Files来自 Azure Databricks .根据文档 Azure Blobs受支持,但我需要此代码来处理 Azure 文件: dbutils.fs.mount( s
我正在尝试使用服务主体从 Databricks 连接到 Synapse。 我已经在集群配置中配置了服务主体 fs.azure.account.auth.type..dfs.core.windows.n
我是一名优秀的程序员,十分优秀!