gpt4 book ai didi

java - 从 Spout 发出自定义 Java 对象作为元组

转载 作者:行者123 更新时间:2023-11-30 08:47:06 25 4
gpt4 key购买 nike

我是 Apache Storm 的新手。为了理解它,我查看了 Storm 教程 (https://github.com/apache/storm/tree/master/examples/storm-starter) 中提供的 Storm 示例。

在所有示例中,我们都将原始类型(String、int 等)作为元组发出。

在我的用例中,我有一个 Java 对象,我想将其作为元组发送到 bolt 。

我不知道该怎么做。看来我必须实现自定义元组生成器才能将 Java 对象转换为 Values

任何人都可以提供一些示例如何做到这一点:

对于 ex 我的 Java 对象是:

class Bean 
{
int A;
String B;
Bean2 b;
//setters and getters
}

class Bean2
{
//Some Attributes
}

现在,在我的 Spout 的 nextTuple() 方法中,我有一个 Bean 对象的实例。

class Spout implements IRichSpout
{
//...
void nextTuple()
{
Bean b = queue.poll();//queue contains instances of Bean Object
//How to convert this object to tuple and emit it as Tuples???
}
}

我如何翻译成 Tuple 并发出它并通过我的 bolt 使用它。

最佳答案

有两种可能:

  1. 仅发出一个包含 Bean 的属性元组:

    class Spout implements IRichSpout {
    void nextTuple() {
    Bean b = queue.poll();
    collector.emit(new Values(b));
    }
    }

    这是最简单的方法,但有一个缺点,即您不能使用 fieldGrouping(...) 中的各个属性来使用 bolt 。此外,您应该在 Kryo 中注册 Bean 以实现高效(反)序列化:

    Config c = new Config();
    c.registerSerialization(Bean.class);
    StormSubmitter.submitTopology("name", c, builder.createTopology());
  2. 单独发出每个属性:

    class Spout implements IRichSpout {
    void nextTuple() {
    Bean b = queue.poll();
    // of course, you can also un-nest Bean2
    collector.emit(new Values(b.getA(), b.getB(), b.getBean2()));
    }
    }

    这允许在 fieldsGrouping(...) 中使用每个属性(或属性组合)。但是,在消费者端,您需要单独检索每个属性并使用您的 setter 方法构建一个新的 Bean 对象。

作为第二种方法的替代方法,请参阅 How to use apache storm tuple

关于java - 从 Spout 发出自定义 Java 对象作为元组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32610340/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com