gpt4 book ai didi

stream - 卡夫卡流 - 如何分组两次?

转载 作者:行者123 更新时间:2023-12-03 23:17:11 26 4
gpt4 key购买 nike

我想创建一个条形图,显示图像中有多少像素颜色;图像每 3 秒更新一次,因此我的条形图也会更新。

我有一个收集 JSON 对象的主题,该对象将其作为图像创建日期的键,并且该值是十六进制值(例如 #FFF)。

我想按键分组,所以它按图像分组,然后按每个组的十六进制值分组并执行 .count()。

你是怎样做的?

我在想streams.groupByKey()...然后groupBy按十六进制值但我需要将KTable转换为KStream...

更新

抱歉,我在手机上打字时缺乏解释。
我会试着再次解释。

顺便说一下,我改变了一些东西。如果你想阅读我在做什么,这是我的 github:https://github.com/Lilmortal .

  • 我的项目“HexGraph-source-connector”在一个
    指定目录并将图像路径推送到主题。
  • 《HexGraph》项目捡起来,使用Akka, Actor 会得到
    单独所有像素十六进制代码并开始推送消息
    到另一个话题。
  • “HexGraph-stream”是我的 kafka 流部分。

  • 但它很长,我怀疑你会读lol。

    无论如何,我从一个主题中阅读,我收到这样的消息 {imagePath: {hexCode: #fff}}。
    图片路径是键,hexCode 是值。我可以有一对多的图像路径,所以我的想法是我的前端会有一个 websocket 来接收它。它将显示一个图像,其顶部有一个条形图,其中包含像素颜色代码的数量。例如有 4 个 #fff、28 个 #fef 等。

    因此,我想按 imagePath 分组,然后我想计算该 imagePath 的每个像素。

    例如:
  • {imagePath1: {hexCode: #fff, count: 47}}
  • {imagePath1: {hexCode: #fef, count: 61}}
  • {imagePath2: {hexCode: #fff, count: 23}}
  • {imagePath2: {hexCode: #fef, count: 55}}

  • 所以这里imagePath1有47个#fff,imagePath2有23个#fff。

    这就是我正在尝试做的 atm。

    最佳答案

    也许在分组之前通过组合键选择?像这样的东西:

    SteamsBuilder topology = new StreamsBuilder();

    topology.stream("input")
    .selectKey((k, v) -> k + v.hex)
    .groupByKey()
    .count()

    这不会 groupBy 两次,而是为您提供所需的效果。

    更新 评论后:
    class Image {
    public String imagePath;
    }

    class ImageAggregation {
    public String imagePath;
    public int count;
    }

    class ImageSerde implements Serde<Image> {
    // implement
    }

    class ImageAggregationSerde implements Serde<ImageAggregation> {
    // implement
    }

    KTable<String, ImageAggregation> table = topology
    .stream("input", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
    .groupBy((k, v) -> v.imagePath)
    .aggregate(ImageAggregation::new,
    (k, v, agg) -> {
    agg.imagePath = v.imagePath;
    agg.count = agg.count + 1;
    return agg;
    }, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageAggregationSerde());

    更新 2 帖子更新后:
    class ImageHex {
    public String imagePath;
    public String hex;
    }

    class ImageHexAggregation {
    public String imagePath;
    public Map<String, Integer> counts;
    }

    class ImageHexSerde implements Serde<ImageHex> {
    // implement
    }

    class ImageHexAggregationSerde implements Serde<ImageHexAggregation> {
    // implement
    }

    KTable<String, ImageHexAggregation> table = topology
    .stream("image-hex-observations", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
    .groupBy((k, v) -> v.imagePath)
    .aggregate(ImageHexAggregation::new,
    (k, v, agg) -> {
    agg.imagePath = v.imagePath;
    Integer currentCount = agg.counts.getOrDefault(v.hex, 0)
    agg.counts.put(v.hex, currentCount + 1));
    return agg;
    }, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageHexAggregationSerde());

    关于stream - 卡夫卡流 - 如何分组两次?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48473840/

    26 4 0