gpt4 book ai didi

java - 单元测试具有外部依赖性的 apache beam 有状态管道

转载 作者:行者123 更新时间:2023-12-05 03:40:55 25 4
gpt4 key购买 nike

我有一个 apache beam 管道,它从 pubsub 读取数据,使用 Redis 丰富数据,最后写入 pubsub。我正在尝试编写测试来测试作为有状态 DoFn 的丰富 Dofn。这里内部状态充当近缓存以减少对 Redis 的调用。为了实例化我的 Redis 客户端,我使用了在 PipelineOptions 中声明的工厂,例如

@Default.InstanceFactory(RedisClientFactory.class)
RedisClient getRedisClient();

void setRedisClient(RedisClient client);

理论上,上面的client应该是每个worker的单例。在我的单元测试中,我试图模拟 redis 客户端中的一些东西。我的测试看起来像这样 -

//setup pipeline
TestStream<MetricsInstance> inputStream =
TestStream.create(...).advanceWatermarkToInfinity();
PCollection<MetricsInstance> enrichedDataStream = pipeline.apply(inputStream)
.apply(ParDo.of(new ConvertToKeyValuePairDoFn<>()))
.apply(ParDo.of(new EnrichMetricsInstanceDoFn()));


CommonPipelineOptions options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
RedisClient redisClient = options.getRedisClient();
JedisPool jedisPool = Mockito.mock(JedisPool.class);
jedis = Mockito.mock(Jedis.class);
Mockito.when(jedisPool.getResource()).thenReturn(jedis);
redisClient.setPool(jedisPool);
... some stubbing code and finally the pipeline run
PAssert.that(enrichedDataStream).containsInAnyOrder(expectedDataStream);
pipeline.run(options);

当我尝试运行这个测试时,我遇到了这样的错误

java.lang.IllegalArgumentException: Failed to serialize and deserialize property 'redisClient' with value 'xxx.xxx.RedisClientImpl@529cfee5'

为了使框架不尝试序列化客户端,我可以在我的 Options 类中的 getRedisClient() 上添加 @JsonIgnore。但这会导致 Redis 实例在某个时候被重新创建,并且我所有的模拟和 stub 都丢失了。我想知道测试此类场景的最佳方法是什么。

最佳答案

在 Apache Beam 邮件列表上进行了一些讨论之后,我能够让这个东西发挥作用。诀窍是以一种方式设置 RedisClientFactory,它使用管道选项中的另一个字段,该字段公开 RedisClient 类的名称。

所以选项看起来像这样 -

    @Default.Class(RedisClientImpl.class)
Class<? extends RedisClient> getRedisClientClass();

void setRedisClientClass(Class<? extends RedisClient> redisClientClass);

@Default.InstanceFactory(RedisClientFactory.class)
RedisClient getRedisClient();

void setRedisClient(RedisClient client);

Factory是这样实现的-

public class RedisClientFactory implements DefaultValueFactory<RedisClient> {
@Override
public RedisClient create(PipelineOptions options) {

CommonPipelineOptions pipelineOptions = options.as(CommonPipelineOptions.class);
return InstanceBuilder.ofType(RedisClient.class)
.fromClass(pipelineOptions.getRedisClientClass())
.fromFactoryMethod("fromOptions")
.withArg(PipelineOptions.class, options)
.build();
}

}

这个工厂使用 RedisClientImpl 类中名为 fromOptions 的方法来构建客户端。

  public static RedisClientImpl fromOptions(PipelineOptions options) {
return new RedisClientImpl(options.as(CommonPipelineOptions.class));
}

使用此设置,我现在可以在我的单元测试中创建 RedisClient 的模拟实例。

options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
options.setRedisClientClass(FakeRedisClient.class);
...
// setup fake data in the FakeRedisClient by calling static methods
FakeRedisClient.keyToValueMap.put(redisKey, redisReturnVal);
...
pipeline.run(options);

我们还需要确保 FakeRedisClient 类也公开了一个名为 fromOptions 的方法

  public static FakeRedisClient fromOptions(PipelineOptions options) {
return new FakeRedisClient();
}

关于java - 单元测试具有外部依赖性的 apache beam 有状态管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67963189/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com