gpt4 book ai didi

java - 如何在流批流连接中定义连接条件?

转载 作者:行者123 更新时间:2023-12-02 09:13:56 25 4
gpt4 key购买 nike

我正在使用带有 java 1.8 的 Spark-sql-2.4.1v。和kafka版本spark-sql-kafka-0-10_2.11_2.4.3。

我正在尝试将静态数据帧(即元数据)与另一个流数据帧连接起来,如下所示:

 Dataset<Row> streamingDs  = //read from kafka topic
Dataset<Row> staticDf= //read from oracle meta-data table.


Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
"c.code = i.industry_code"
);

即使我在数据框中有相应的列数据,它也会给出以下错误。

线程“main”org.apache.spark.sql.AnalysisException 中出现异常:无法在联接的左侧解析 USING 列c.code = i.industry_code。左侧列:[id, transasctionDate, companyName,code];

我尝试如下:

Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
"c.code = i.industry_code",
"inner"
);

这会产生以下错误:

The method join(Dataset, String) in the type Dataset is not applicable for the arguments (Dataset, String, String)

最佳答案

tl;dr c.code = i.industry_code 被视为要连接的列的名称(而不是连接表达式)。

<小时/>

将代码更改为如下:

streamingDs.as("c").join(staticDf.as("i")) // INNER JOIN is the default
.where("c.code = i.industry_code")

关于java - 如何在流批流连接中定义连接条件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59159303/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com