gpt4 book ai didi

scala - Akka 流如何持续实现?

转载 作者:行者123 更新时间:2023-12-04 17:46:37 25 4
gpt4 key购买 nike

我正在使用 Akka Streams在 Scala 中从 AWS SQS 进行投票队列使用 AWS Java SDK .我创建了一个 ActorPublisher它以两秒的间隔使消息出列:

class SQSSubscriber(name: String) extends ActorPublisher[Message] {
implicit val materializer = ActorMaterializer()

val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue")

val client = new AmazonSQSClient()
client.setRegion(RegionUtils.getRegion("us-east-1"))
val url = client.getQueueUrl(name).getQueueUrl

val MaxBufferSize = 100
var buf = Vector.empty[Message]

override def receive: Receive = {
case "dequeue" =>
val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList
messages.foreach(self ! _)
case message: Message if buf.size == MaxBufferSize =>
log.error("The buffer is full")
case message: Message =>
if (buf.isEmpty && totalDemand > 0)
onNext(message)
else {
buf :+= message
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}

@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}

在我的应用程序中,我也尝试以 2 秒的间隔运行流:
val system = ActorSystem("system")
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name"))
val flow = Flow[Message]
.map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem }
.to(Sink.ignore)

system.scheduler.schedule(0 seconds, 2 seconds) {
flow.runWith(sqsSource)(ActorMaterializer()(system))
}

但是,当我运行我的应用程序时,我收到 java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]以及由 ActorMaterializer 引起的后续死信通知.

是否有推荐的方法来持续实现 Akka 流?

最佳答案

我认为您不需要创建一个新的 ActorPublisher每 2 秒。这似乎是多余和浪费内存的。另外,我不认为 ActorPublisher 是必要的。根据我对代码的了解,您的实现将有越来越多的 Streams 查询相同的数据。每个Message来自客户端的数据将被 N 个不同的 akka Streams 处理,更糟糕的是,N 会随着时间的推移而增长。

用于无限循环查询的迭代器

您可以通过使用 Scala 的 Iterator 从您的 ActorPublisher 获得相同的行为。 .可以创建一个持续查询客户端的迭代器:

//setup the client
val client = {
val sqsClient = new AmazonSQSClient()
sqsClient setRegion (RegionUtils getRegion "us-east-1")
sqsClient
}

val url = client.getQueueUrl(name).getQueueUrl

//single query
def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable {
client receiveMessage (new ReceiveMessageRequest(url).getMessages)
}

def messageListIteartor : Iterator[Iterable[Message]] =
Iterator continually messageListStream

//messages one-at-a-time "on demand", no timer pushing you around
def messageIterator() : Iterator[Message] = messageListIterator flatMap identity

此实现仅在所有先前的 Messages 都已被消费后才查询客户端,因此确实是 reactive .无需跟踪固定大小的缓冲区。您的解决方案需要一个缓冲区,因为消息的创建(通过计时器)与消息的消耗(通过 println)分离。在我的实现中,创建和消费是 tightly coupled通过背压。

Akka 流源

然后,您可以使用此迭代器生成器函数来提供 akka 流源:
def messageSource : Source[Message, _] = Source fromIterator messageIterator

流动形成

最后你可以使用这个 Source 来执行 println (附带说明:您的 flow 值实际上是 Sink ,因为 Flow + Sink = Sink )。使用您的 flow问题的值(value):
messageSource runWith flow

一个 akka Stream 处理所有消息。

关于scala - Akka 流如何持续实现?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32544467/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com