gpt4 book ai didi

java - 动态集合上的 Flux

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:33:41 25 4
gpt4 key购买 nike

好吧,我有点困惑我应该如何将 Reactor 模式与 Spring 的 Webflux/Reactor API 一起使用。

假设我有一个不断添加文件的目录。每当出现一个新文件时,我的应用程序就应该可以处理它。如果队列已满,则应忽略新文件(因此没有 FileWatcher),直到队列中有空间为止。

以下只是伪代码,我试图理解大概的思路,而不是具体的实现细节。

类(class)目录观察员

管理一个计划任务,如果新文件出现,每 2 秒检查一次。如果是,我们尝试将它们添加到队列中。

@PostConstruct
public void initialize() {

this.flux = Flux.generate(consumer -> {
consumer.next(this.deque.pop());
});

this.flux.log();
}

@Scheduled(fixedRate = 2000)
public void checkDirectory() {
// list files, for each add to deque (if not present)
}

public Flux<Path> getObserver() {
return this.flux;
}

类文件处理器

一个一个地消费队列中的项目。

@PostConstruct
private void subscribe() {
this.observeDirectorytask.getObserver().subscribe(path -> {
log.info("Processing '{}'", path.getFileName());

// process file, delete it once done
});
}

这种方法是否有意义?如果是,我需要做什么才能在新项目添加到队列时触发我的订阅(现在这只在启动时执行一次)。

更新

这是我的工作实现:

public class DirectoryObserverTask {

@Autowired
private Path observedDirectory;

private Consumer<Path> newFilesConsumer;
private Consumer<Throwable> errorsConsumer;

private Flux<Path> flux;


@PostConstruct
public void init() {
this.observedDirectory = Paths.get(importDirectoryProperty);
}

public void subscribe(Consumer<Path> consumer) {
if(this.flux == null) {
this.flux = Flux.push(sink -> {
this.onError(err -> sink.error(err));
this.onNewFile(file -> sink.next(file));
});
this.flux = this.flux.onBackpressureBuffer(10, BufferOverflowStrategy.DROP_LATEST);
}

this.flux.subscribe(consumer);

}


@Scheduled(fixedRate = 2000)
public void checkDirectoryContent() throws IOException {
Files.newDirectoryStream(this.observedDirectory).forEach(path -> {
this.newFilesConsumer.accept(path);
});
}

public void onNewFile(Consumer<Path> newFilesConsumer) {
this.newFilesConsumer = newFilesConsumer;
}

public void onError(Consumer<Throwable> errorsConsumer) {
this.errorsConsumer = errorsConsumer;
}

消费者

@Autowired
private DirectoryObserverTask observeDirectorytask;

@PostConstruct
private void init() {
observeDirectorytask.subscribe(path -> {
this.processPath(path);
});
}

public void processPath(Path t) {
Mono.justOrEmpty(t)
.subscribe(path -> {
// handle the file
path.toFile().delete();
});
}

最佳答案

你不需要使用队列,这个行为已经是内置的。

我会做这样的事情:

  1. 使用文件观察器检测更改。

  2. 将更改推送到 Flux<File> .

  3. 根据要求,限制排队事件的数量(使用背压):

    filesFlux.onBackPressureBuffer(10, BufferOverflowStrategy.DROP_LATEST)

  4. 照常订阅。

对背压的一个糟糕的解释是:“当我们不能足够快地处理元素时该怎么办”。

在这种情况下,我们最多缓冲 10 个元素,然后在缓冲区已满时丢弃最新的元素。

更新:创建 Flux 的方法有很多种。在这种特殊情况下,我会看一下 createpush方法(参见 documentation )。

示例:假设您有一个 FileWatchService您可以在其中注册检测到新文件时的回调和出现错误时的回调。你可以这样做:

FileWatchService watcher = ...

Flux<File> fileFlux = Flux.push(sink -> {
watcher.onError(err -> sink.error(err));
watcher.onNewFile(file -> sink.next(file));
});

fileFlux
.onBackPressureBuffer(10, BufferOverflowStrategy.DROP_LATEST)
.subscribe(...)

关于java - 动态集合上的 Flux,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48099765/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com