gpt4 book ai didi

java - Apache Flink 表查询结果作为字符串值

转载 作者:行者123 更新时间:2023-12-01 16:28:44 25 4
gpt4 key购买 nike

我正在从 flink 表 api 编写查询来检索记录。然后检查是否找到记录,如果找到,则获取每个记录的列值的字符串值。

users: 
|id | name | phone |
|---|------|-------|
| 01| sam | 23354 |
| 02| jake | 23352 |
| 03| kim | 23351 |

问题是 flink 仅从查询返回表,因此我无法 1:检查是否找到记录以及 2:获取找到的记录值的各个值

须藤代码:

foundRecord = find record by phone
if foundRecord {
create new instance of Visitor
Visitor.name = foundRecord.name
Visitor.id = foundRecord.id
} else {
throw exception
}

flink 文档推荐的下面的代码为我提供了一个表,但不确定如何实现上述 sudo 代码,因为它作为另一个表返回,而我需要实际的记录值。

Table users = registeredUsers.select("id, name, phone").where("phone === '23354'"));

Flink 文档引用:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#expression-syntax

最佳答案

为了知道找不到匹配的记录,输入必须有界 - 因此我们将使用 BatchTableEnvironment,而不是 StreamTableEnvironment。 (通过流式输入,匹配的记录最终可能到达并满足查询。只有通过批量输入,我们才能证明不存在匹配。)

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.util.Collector

class MissingResultException() extends Exception {}

object Phone {
case class Visitor(name: String, id: String)

@throws[Exception]
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)

val rawInput = env.fromElements(
("01", "sam", "23354"),
("02", "jake", "23352"),
("03", "kim", "23351"))

val events = tableEnv.fromDataSet(rawInput, 'id, 'name, 'phone)
tableEnv.registerTable("events", events)
val resultTable = tableEnv
.from("events")
.select('id, 'name, 'phone)
.where("phone === 'missing'")

val results = resultTable.toDataSet[Row]

results
.map(row => new Visitor(row.getField(1).toString, row.getField(0).toString))
.print

val count: DataSet[Long] = env.fromElements(results.count())

count
.flatMap(new FlatMapFunction[Long, Collector[Long]]{

override def flatMap(x: Long, collector: Collector[Collector[Long]]): Unit = {
if (x == 0L) {
throw new MissingResultException
}
}})

.print()
}
}

我用来检测结果集为空的方法感觉像是黑客,但我想不出更好的方法。请注意,最后的 print() 是必要的,尽管没有任何内容可打印,因为任何最终未馈送到接收器的计算都将被优化掉,并且不会执行。

关于java - Apache Flink 表查询结果作为字符串值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62094093/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com