gpt4 book ai didi

caching - 在 PySpark 环境中创建缓存的最佳方式

转载 作者:行者123 更新时间:2023-12-04 12:49:16 26 4
gpt4 key购买 nike

我正在使用 Spark Streaming 创建一个系统来丰富来自 Cloudant 数据库的传入数据。例子 -

Incoming Message: {"id" : 123}
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"}

我的驱动程序类代码如下:

from Sample.Job import EnrichmentJob
from Sample.Job import FunctionJob
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession

from kafka import KafkaConsumer, KafkaProducer
import json

class SampleFramework():

def __init__(self):
pass

@staticmethod
def messageHandler(m):
return json.loads(m.message)

@staticmethod
def processData(rdd):

if (rdd.isEmpty()):
print("RDD is Empty")
return

# Expand
expanded_rdd = rdd.mapPartitions(EnrichmentJob.enrich)

# Score
scored_rdd = expanded_rdd.map(FunctionJob.function)

# Publish RDD


def run(self, ssc):

self.ssc = ssc

directKafkaStream = KafkaUtils.createDirectStream(self.ssc, QUEUENAME, \
{"metadata.broker.list": META,
"bootstrap.servers": SERVER}, \
messageHandler= SampleFramework.messageHandler)

directKafkaStream.foreachRDD(SampleFramework.processData)

ssc.start()
ssc.awaitTermination()

浓缩作业的代码如下:
类 EnrichmentJob:

cache = {}

@staticmethod
def enrich(data):

# Assume that Cloudant Connector using the available config
cloudantConnector = CloudantConnector(config, config["cloudant"]["host"]["req_db_name"])
final_data = []
for row in data:
id = row["id"]
if(id not in EnrichmentJob.cache.keys()):
data = cloudantConnector.getOne({"id": id})
row["data"] = data
EnrichmentJob.cache[id]=data
else:
data = EnrichmentJob.cache[id]
row["data"] = data
final_data.append(row)

cloudantConnector.close()

return final_data

我的问题是 - 有没有办法维护 [1]“所有工作人员都可以访问的主内存上的全局缓存”或 [2]“每个工作人员上的本地缓存,以便它们保持在 foreachRDD 设置中”?

我已经探索了以下内容 -
  • 广播变量 - 在这里我们采用 [1] 方式。据我了解,它们是只读的和不可变的。我已经检查了这个 reference但它引用了一个非持久化/持久化广播变量的例子。这是一个好习惯吗?
  • 静态变量 - 在这里我们采用 [2] 方式。被引用的类(在本例中为“Enricher”)以静态变量字典的形式维护缓存。但结果是 ForEachRDD 函数为每个传入的 RDD 生成一个全新的进程,这删除了之前启动的静态变量。这是上面编码的一个。

  • 我现在有两种可能的解决方案 -
  • 在文件系统上维护离线缓存。
  • 在我的驱动程序节点上完成此扩充任务的整个计算。这将导致整个数据最终在驱动程序上并在那里维护。缓存对象将作为映射函数的参数发送到扩充作业。

  • 显然,第一个看起来比第二个更好,但我想得出结论,在 promise 之前,这两个是唯一的方法。任何指针将不胜感激!

    最佳答案

    Is there someway to maintain [1]"a global cache on the main memory that is accessible to all workers"



    不。没有所有 worker 都可以访问的“主内存”。每个 worker 运行在一个单独的进程中,并通过套接字与外部世界进行通信。更不用说非本地模式下不同物理节点之间的分离了。

    有一些技术可用于实现具有内存映射数据的工作范围缓存(使用 SQLite 是最简单的一种),但需要一些额外的努力来实现正确的方式(避免冲突等)。

    or [2]"local caches on each of the workers such that they remain persisted in the foreachRDD setting"?



    您可以使用范围仅限于单个工作进程的标准缓存技术。根据配置(静态与 dynamic resource allocationspark.python.worker.reuse ),它可能会或可能不会在多个任务和批次之间保留。

    考虑以下简化示例:
  • map_param.py :

    from pyspark import AccumulatorParam
    from collections import Counter

    class CounterParam(AccumulatorParam):
    def zero(self, v: Counter) -> Counter:
    return Counter()

    def addInPlace(self, acc1: Counter, acc2: Counter) -> Counter:
    acc1.update(acc2)
    return acc1
  • my_utils.py :

    from pyspark import Accumulator
    from typing import Hashable
    from collections import Counter

    # Dummy cache. In production I would use functools.lru_cache
    # but it is a bit more painful to show with accumulator
    cached = {}

    def f_cached(x: Hashable, counter: Accumulator) -> Hashable:
    if cached.get(x) is None:
    cached[x] = True
    counter.add(Counter([x]))
    return x


    def f_uncached(x: Hashable, counter: Accumulator) -> Hashable:
    counter.add(Counter([x]))
    return x
  • main.py :

    from pyspark.streaming import StreamingContext
    from pyspark import SparkContext

    from counter_param import CounterParam
    import my_utils

    from collections import Counter

    def main():
    sc = SparkContext("local[1]")
    ssc = StreamingContext(sc, 5)

    cnt_cached = sc.accumulator(Counter(), CounterParam())
    cnt_uncached = sc.accumulator(Counter(), CounterParam())

    stream = ssc.queueStream([
    # Use single partition to show cache in work
    sc.parallelize(data, 1) for data in
    [[1, 2, 3], [1, 2, 5], [1, 3, 5]]
    ])

    stream.foreachRDD(lambda rdd: rdd.foreach(
    lambda x: my_utils.f_cached(x, cnt_cached)))
    stream.foreachRDD(lambda rdd: rdd.foreach(
    lambda x: my_utils.f_uncached(x, cnt_uncached)))

    ssc.start()
    ssc.awaitTerminationOrTimeout(15)
    ssc.stop(stopGraceFully=True)

    print("Counter cached {0}".format(cnt_cached.value))
    print("Counter uncached {0}".format(cnt_uncached.value))

    if __name__ == "__main__":
    main()

  • 示例运行:

    bin/spark-submit main.py

    Counter cached Counter({1: 1, 2: 1, 3: 1, 5: 1})
    Counter uncached Counter({1: 3, 2: 2, 3: 2, 5: 2})

    如您所见,我们得到了预期的结果:
  • 对于“缓存”对象,每个工作进程(分区)的每个唯一键仅更新一次累加器。
  • 对于未缓存的对象,每次出现键时都会更新累加器。
  • 关于caching - 在 PySpark 环境中创建缓存的最佳方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41398242/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com