gpt4 book ai didi

python - 高效的字符串后缀检测

转载 作者:太空狗 更新时间:2023-10-29 21:23:20 26 4
gpt4 key购买 nike

我正在使用 PySpark 处理一个巨大的数据集,我想在其中根据另一个数据框中的字符串过滤数据框。例如,

dd = spark.createDataFrame(["something.google.com","something.google.com.somethingelse.ac.uk","something.good.com.cy", "something.good.com.cy.mal.org"], StringType()).toDF('domains')
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com |
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy |
|something.good.com.cy.mal.org |
+----------------------------------------+

dd1 = spark.createDataFrame(["google.com", "good.com.cy"], StringType()).toDF('gooddomains')
+-----------+
|gooddomains|
+-----------+
|google.com |
|good.com.cy|
+-----------+

我假设 domainsgooddomains 是有效的域名。

我想做的是过滤掉dd中不以dd1结尾的匹配字符串。所以在上面的例子中,我想过滤掉第 1 行和第 3 行,以结束

+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org |
+----------------------------------------+

我当前的解决方案(如下所示)最多只能考虑 3 个“词”的域。如果我要在 dd1(即白名单)中添加 say,verygood.co.ac.uk,那么它将失败。

def split_filter(x, whitelist):
splitted1 = x.select(F.split(x['domains'], '\.').alias('splitted_domains'))
last_two = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
F.lit('.'), \
splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_two'))
last_three = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-3], \
F.lit('.'), \
splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
F.lit('.'), \
splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_three'))
x = x.withColumn('id', F.monotonically_increasing_id())
last_two = last_two.withColumn('id', F.monotonically_increasing_id())
last_three = last_three.withColumn('id', F.monotonically_increasing_id())
final_d = x.join(last_two, ['id']).join(last_three, ['id'])
df1 = final_d.join(whitelist, final_d['last_two'] == whitelist['domains'], how = 'left_anti')
df2 = df1.join(whitelist, df1['last_three'] == whitelist['domains'], how = 'left_anti')
return df2.drop('id')

我正在使用 Spark 2.3.0 和 Python 2.7.5。

最佳答案

让我们扩展以获得更好的覆盖范围:

domains = spark.createDataFrame([
"something.google.com", # OK
"something.google.com.somethingelse.ac.uk", # NOT OK
"something.good.com.cy", # OK
"something.good.com.cy.mal.org", # NOT OK
"something.bad.com.cy", # NOT OK
"omgalsogood.com.cy", # NOT OK
"good.com.cy", # OK
"sogood.example.com", # OK Match for shorter redundant, mismatch on longer
"notsoreal.googleecom" # NOT OK
], "string").toDF('domains')

good_domains = spark.createDataFrame([
"google.com", "good.com.cy", "alsogood.com.cy",
"good.example.com", "example.com" # Redundant case
], "string").toDF('gooddomains')

现在... 一个天真的解决方案,只使用 Spark SQL 原语,是稍微简化您当前的方法。既然你已经声明假设这些是有效的公共(public)域是安全的,我们可以定义一个这样的函数:

from pyspark.sql.functions import col, regexp_extract

def suffix(c):
return regexp_extract(c, "([^.]+\\.[^.]+$)", 1)

提取顶级域和一级子域:

domains_with_suffix = (domains
.withColumn("suffix", suffix("domains"))
.alias("domains"))
good_domains_with_suffix = (good_domains
.withColumn("suffix", suffix("gooddomains"))
.alias("good_domains"))

domains_with_suffix.show()
+--------------------+--------------------+
| domains| suffix|
+--------------------+--------------------+
|something.google.com| google.com|
|something.google....| ac.uk|
|something.good.co...| com.cy|
|something.good.co...| mal.org|
|something.bad.com.cy| com.cy|
| omgalsogood.com.cy| com.cy|
| good.com.cy| com.cy|
| sogood.example.com| example.com|
|notsoreal.googleecom|notsoreal.googleecom|
+--------------------+--------------------+

现在我们可以外连接了:

from pyspark.sql.functions import (
col, concat, lit, monotonically_increasing_id, sum as sum_
)

candidates = (domains_with_suffix
.join(
good_domains_with_suffix,
col("domains.suffix") == col("good_domains.suffix"),
"left"))

过滤结果:

is_good_expr = (
col("good_domains.suffix").isNotNull() & # Match on suffix
(

# Exact match
(col("domains") == col("gooddomains")) |
# Subdomain match
col("domains").endswith(concat(lit("."), col("gooddomains")))
)
)

not_good_domains = (candidates
.groupBy("domains") # .groupBy("suffix", "domains") - see the discussion
.agg((sum_(is_good_expr.cast("integer")) > 0).alias("any_good"))
.filter(~col("any_good"))
.drop("any_good"))

not_good_domains.show(truncate=False)
+----------------------------------------+
|domains |
+----------------------------------------+
|omgalsogood.com.cy |
|notsoreal.googleecom |
|something.good.com.cy.mal.org |
|something.google.com.somethingelse.ac.uk|
|something.bad.com.cy |
+----------------------------------------+

这比 Cartesian product required for direct join with LIKE 好, 但对于暴力来说并不令人满意,在最坏的情况下需要两次洗牌 - 一次用于 join (如果 good_domains 足够小以 broadcasted 则可以跳过),另一个用于 group_by + agg

不幸的是,Spark SQL 不允许自定义分区程序对两者只使用一次随机播放(但是在 RDD API 中使用 composite key 是可能的)并且优化器还不够智能,无法优化 join(_, "key1").groupBy("key1", _)

如果您可以接受一些假阴性,您可以去概率化。首先让我们构建概率计数器(这里使用 bounter 并得到 toolz 的帮助)

from pyspark.sql.functions import concat_ws, reverse, split
from bounter import bounter
from toolz.curried import identity, partition_all

# This is only for testing on toy examples, in practice use more realistic value
size_mb = 20
chunk_size = 100

def reverse_domain(c):
return concat_ws(".", reverse(split(c, "\\.")))

def merge(acc, xs):
acc.update(xs)
return acc

counter = sc.broadcast((good_domains
.select(reverse_domain("gooddomains"))
.rdd.flatMap(identity)
# Chunk data into groups so we reduce the number of update calls
.mapPartitions(partition_all(chunk_size))
# Use tree aggregate to reduce pressure on the driver,
# when number of partitions is large*
# You can use depth parameter for further tuning
.treeAggregate(bounter(need_iteration=False, size_mb=size_mb), merge, merge)))

接下来像这样定义一个用户定义的函数函数

from pyspark.sql.functions import pandas_udf, PandasUDFType
from toolz import accumulate

def is_good_counter(counter):
def is_good_(x):
return any(
x in counter.value
for x in accumulate(lambda x, y: "{}.{}".format(x, y), x.split("."))
)

@pandas_udf("boolean", PandasUDFType.SCALAR)
def _(xs):
return xs.apply(is_good_)
return _

并过滤:

domains.filter(
~is_good_counter(counter)(reverse_domain("domains"))
).show(truncate=False)
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org |
|something.bad.com.cy |
|omgalsogood.com.cy |
|notsoreal.googleecom |
+----------------------------------------+

在 Scala 中这可以通过 bloomFilter

完成
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import org.apache.spark.util.sketch.BloomFilter

def reverseDomain(c: Column) = concat_ws(".", reverse(split(c, "\\.")))

val checker = good_domains.stat.bloomFilter(
// Adjust values depending on the data
reverseDomain($"gooddomains"), 1000, 0.001
)

def isGood(checker: BloomFilter) = udf((s: String) =>
s.split('.').toStream.scanLeft("") {
case ("", x) => x
case (acc, x) => s"${acc}.${x}"
}.tail.exists(checker mightContain _))


domains.filter(!isGood(checker)(reverseDomain($"domains"))).show(false)
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org |
|something.bad.com.cy |
|omgalsogood.com.cy |
|notsoreal.googleecom |
+----------------------------------------+

如果需要,shouldn't be hard to call such code from Python .

由于近似性质,这可能仍然不能完全令人满意。如果您需要准确的结果,您可以尝试利用数据的冗余特性,例如 trie (此处使用 datrie 实现)。

如果 good_domains 相对较小,您可以创建单个模型,其方式与概率变体中的方式类似:

import string
import datrie


def seq_op(acc, x):
acc[x] = True
return acc

def comb_op(acc1, acc2):
acc1.update(acc2)
return acc1

trie = sc.broadcast((good_domains
.select(reverse_domain("gooddomains"))
.rdd.flatMap(identity)
# string.printable is a bit excessive if you need standard domain
# and not enough if you allow internationalized domain names.
# In the latter case you'll have to adjust the `alphabet`
# or use different implementation of trie.
.treeAggregate(datrie.Trie(string.printable), seq_op, comb_op)))

定义用户自定义函数:

def is_good_trie(trie):
def is_good_(x):
if not x:
return False
else:
return any(
x == match or x[len(match)] == "."
for match in trie.value.iter_prefixes(x)
)

@pandas_udf("boolean", PandasUDFType.SCALAR)
def _(xs):
return xs.apply(is_good_)

return _

并将其应用于数据:

domains.filter(
~is_good_trie(trie)(reverse_domain("domains"))
).show(truncate=False)
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org |
|something.bad.com.cy |
|omgalsogood.com.cy |
|notsoreal.googleecom |
+----------------------------------------+

这种特定方法的工作假设是所有 good_domains 都可以压缩到单个 trie 中,但可以轻松扩展以处理不满足此假设的情况。例如,您可以为每个顶级域或后缀构建一个 trie(如天真的解决方案中所定义)

(good_domains
.select(suffix("gooddomains"), reverse_domain("gooddomains"))
.rdd
.aggregateByKey(datrie.Trie(string.printable), seq_op, comb_op))

然后,要么从序列化版本按需加载模型,要么使用 RDD 操作。

这两种非原生方法可以根据数据、业务需求(如近似解情况下的漏报容忍度)和可用资源(驱动程序内存、执行程序内存、后缀 的基数)进一步调整>,访问分布式 POSIX 兼容的分布式文件系统,等等)。在将这些应用于 DataFramesRDD(内存使用、通信和序列化开销)之间进行选择时,还需要考虑一些权衡。


* 参见 Understanding treeReduce() in Spark

关于python - 高效的字符串后缀检测,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54481681/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com