gpt4 book ai didi

java - Flux.generate(...) 的预加载元素

转载 作者:行者123 更新时间:2023-12-02 08:29:47 29 4
gpt4 key购买 nike

我正在使用 Flux.generate() 创建一个 Flux。生成器(消费者)实际上是从消息队列中读取。问题是这个调用需要相当长的时间(有时甚至1-2秒)。这将使助焊剂停止处理。

package com.github.loa.vault.service.listener;

import com.github.loa.document.service.domain.DocumentType;
import com.github.loa.queue.service.QueueManipulator;
import com.github.loa.queue.service.domain.Queue;
import com.github.loa.queue.service.domain.message.DocumentArchivingMessage;
import com.github.loa.vault.service.domain.DocumentArchivingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.SynchronousSink;

import java.util.function.Consumer;

@Slf4j
@Service
@RequiredArgsConstructor
public class VaultQueueConsumer implements Consumer<SynchronousSink<DocumentArchivingContext>> {

private final QueueManipulator queueManipulator;

@Override
public void accept(final SynchronousSink<DocumentArchivingContext> documentSourceItemSynchronousSink) {
final DocumentArchivingMessage documentArchivingMessage = (DocumentArchivingMessage)
queueManipulator.readMessage(Queue.DOCUMENT_ARCHIVING_QUEUE);

documentSourceItemSynchronousSink.next(
DocumentArchivingContext.builder()
.type(DocumentType.valueOf(documentArchivingMessage.getType()))
.source(documentArchivingMessage.getSource())
.content(documentArchivingMessage.getContent())
.build()
);
}
}

显然添加parallel并没有帮助,因为生成器仍然一次被调用一个。

Flux.generate(vaultQueueConsumer)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(vaultDocumentManager::archiveDocument)
.subscribe();

有人知道如何使发电机并联吗?我不想使用 Flux.create(),因为那样我就会失去背压。

最佳答案

Mono.just(1).repeat()  // create infinite flux, maybe there is a nicer way for that?
.flatMap(this::readFromQueue, 100) // define queue polling concurrency
.flatMap(this::archiveDocument)
.subscribe();
private Mono<String> readFromQueue(Integer ignore)
{
return Mono.fromCallable(() -> {
Thread.sleep(1500); // your actual blocking queue polling here
return "queue_element";
}).subscribeOn(Schedulers.elastic()); // dedicate blocking call to threadpool
}

关于java - Flux.generate(...) 的预加载元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59708845/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com