gpt4 book ai didi

scala - 有没有办法在 map 期间跳过/抛出/忽略 Spark 中的记录?

转载 作者:行者123 更新时间:2023-12-04 01:59:15 25 4
gpt4 key购买 nike

我们有一个非常标准的 Spark 作业,它从 s3 读取日志文件,然后对它们进行一些处理。非常基本的 Spark 东西...

val logs = sc.textFile(somePathTos3)
val mappedRows = logs.map(log => OurRowObject.parseLog(log.split("\t")))
val validRows = mappedRows.filter(log => log._1._1 != "ERROR")
...and continue processing

哪里 OurRowObject.parseLine获取原始日志行并将其映射到一些(键,值)对(例如 ( (1,2,3,4), (5,6,7) ),然后我们可以对其进行处理。现在,如果 parseLine 遇到“问题”日志(格式错误、空等)... ) 它将返回一些标记值(例如 ( ("ERROR", ...), (...) ),然后过滤步骤将其过滤掉。

现在,我一直试图找到一种方法,就是在 map 中不包含问题行……某种方式告诉 spark“嘿,这是一个空行/格式不正确的行,跳过它,不要” t 包括一对”,而不是额外的过滤步骤。

我还没有找到一种方法来做到这一点,并且发现这个功能不存在 (AFAICanFind) 非常有趣。

谢谢

最佳答案

您可以让解析器返回 Option[Value] 而不是 Value。这样你就可以使用 flatMap 将行映射到行并删除那些无效的行。

在粗略的线条是这样的:

def parseLog(line:String):Option[Array[String]] = {
val splitted = log.split("\t")
if (validate(splitted)) Some(splitted) else None
}

val validRows = logs.flatMap(OurRowObject.parseLog(_))

关于scala - 有没有办法在 map 期间跳过/抛出/忽略 Spark 中的记录?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26766026/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com