gpt4 book ai didi

apache-storm - Storm 中的三叉戟状态是什么?

转载 作者:行者123 更新时间:2023-12-04 11:08:18 29 4
gpt4 key购买 nike

我是 Storm 中 Trident 的新手。我对 TridentState 感到很头疼。据我了解,三叉戟维护每个批次的状态(即元数据)(批次中的所有元组是否都通过在数据库中维护事务 ID 来完全处理),我不完全确定以下语句的作用

TridentState urlToTweeters =
topology.newStaticState(getUrlToTweetersState());

谁能解释一下当我们定义上面的代码时实际发生了什么?

最佳答案

我希望现在回答永远不会太晚,至少其他人可能会发现我的回答有用:)

所以,topology.newStaticState()是 Trident 对可查询数据存储的抽象。 newStaticState() 的参数应该是一个实现——基于方法的契约——storm.trident.state.StateFactory .反过来,工厂应该实现 makeState()方法返回 storm.trident.state.State 的实例.然而,如果你打算查询你的状态,你应该返回一个 storm.trident.state.map.ReadOnlyMapState 的实例。相反,因为普通 storm.trident.state.State没有查询实际数据源的方法(如果您尝试使用除 ReadOnlyMapState 之外的任何内容,您实际上会得到一个类转换异常)。

所以,让我们试一试吧!

一个虚拟状态实现:

public static class ExampleStaticState implements ReadOnlyMapState<String> {

private final Map<String, String> dataSourceStub;

public ExampleStaticState() {
dataSourceStub = new HashMap<>();
dataSourceStub.put("tuple-00", "Trident");
dataSourceStub.put("tuple-01", "definitely");
dataSourceStub.put("tuple-02", "lacks");
dataSourceStub.put("tuple-03", "documentation");
}

@Override
public List<String> multiGet(List<List<Object>> keys) {

System.out.println("DEBUG: MultiGet, keys is " + keys);

List<String> result = new ArrayList<>();

for (List<Object> inputTuple : keys) {
result.add(dataSourceStub.get(inputTuple.get(0)));
}

return result;
}

@Override
public void beginCommit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Begin commit, txid=" + txid);
}

@Override
public void commit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Commit, txid=" + txid);
}
}

一个工厂:
public static class ExampleStaticStateFactory implements StateFactory {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new ExampleStaticState();
}
}

一个简单的 psvm (又名 public static void main):
public static void main(String... args) {
TridentTopology tridentTopology = new TridentTopology();
FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{
"foo"
}));
TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory());
tridentTopology
.newStream("spout", spout)
.stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar"))
.each(new Fields("foo", "bar"), new Debug())
;

Config conf = new Config();
conf.setNumWorkers(6);

LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("tridentTopology", conf, tridentTopology.build());

spout.feed(Arrays.asList(new Values[]{
new Values("tuple-00"),
new Values("tuple-01"),
new Values("tuple-02"),
new Values("tuple-03")
}));

localCluster.shutdown();
}

最后,输出:
DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]]
DEBUG: [tuple-00, Trident]
DEBUG: [tuple-01, definitely]
DEBUG: [tuple-02, lacks]
DEBUG: [tuple-03, documentation]

您会看到,stateQuery() 从输入批次中获取值并将它们映射到在“数据存储”中找到的值。

再深入一点,可以看一下 MapGet的源码类(其实例用于在拓扑内部进行查询的人)并在那里找到以下内容:
public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
@Override
public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
return map.multiGet((List) keys);
}

@Override
public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
collector.emit(new Values(result));
}
}

所以在引擎盖下它只是简单地调用 multiGet()您的方法 ReadOnlyMapState实现,然后发出在数据存储中找到的值,将它们添加到已经存在的元组中。您可以(尽管这可能不是最好的做法)创建自己的 BaseQueryFunction<ReadOnlyMapState, Object> 实现。做一些更复杂的事情。

关于apache-storm - Storm 中的三叉戟状态是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17628108/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com