gpt4 book ai didi

java - 如何将流数据与 Dataflow/Beam 中的大型历史数据集相结合

转载 作者:行者123 更新时间:2023-12-04 02:24:27 29 4
gpt4 key购买 nike

我正在调查通过 Google Dataflow/Apache Beam 处理来自网络用户 session 的日志,并且需要将用户的日志(流式传输)与上个月的用户 session 历史记录结合起来。

我研究了以下方法:

  • 使用 30 天固定窗口:最有可能是一个很大的窗口以适应内存,而且我不需要更新用户的历史记录,只需引用
  • 使用 CoGroupByKey 连接两个数据集,但两个数据集必须具有相同的窗口大小( https://cloud.google.com/dataflow/model/group-by-key#join ),这在我的情况下(24 小时与 30 天)不正确
  • 使用 Side Input 检索给定 element 的用户 session 历史记录在 processElement(ProcessContext processContext)

  • 我的理解是通过 .withSideInputs(pCollectionView)加载的数据需要适应内存。我知道我可以将单个用户的所有 session 历史记录放入内存中,但不能将所有 session 历史记录放入内存中。

    我的问题是是否有办法从仅与当前用户 session 相关的侧输入加载/流式传输数据?

    我正在想象一个 parDo 函数,它将通过指定用户的 ID 从侧面输入加载用户的历史 session 。但只有当前用户的历史 session 才能放入内存;通过侧输入加载所有历史 session 会太大。

    一些伪代码来说明:
    public static class MetricFn extends DoFn<LogLine, String> {

    final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;

    public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
    this.pHistoryView = historyView;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
    Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);

    final LogLine currentLogLine = processContext.element();
    final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
    final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
    processContext.output(outputMetric);
    }
    }

    最佳答案

    目前还没有一种方法可以访问流中的每个键侧输入,但它肯定会如您所描述的那样有用,而且我们正在考虑实现它。

    一种可能的解决方法是使用侧输入来分发指向实际 session 历史记录的指针。生成 24 小时 session 历史的代码可以将它们上传到 GCS/BigQuery/等,然后将位置作为附加输入发送到加入代码。

    关于java - 如何将流数据与 Dataflow/Beam 中的大型历史数据集相结合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36927558/

    29 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com