gpt4 book ai didi

scala - 如何使用 Akka Streams 实现一个简单的 TCP 协议(protocol)?

转载 作者:可可西里 更新时间:2023-11-01 02:31:40 28 4
gpt4 key购买 nike

我尝试实现了一个简单的基于 TCP 的协议(protocol),用于与 Akka Streams 交换消息(见下文)。但是,似乎传入 消息没有立即处理;也就是说,在客户端接连发送两条消息的场景中,第一条消息仅在从服务器发送某些内容后打印:

At t=1, on [client] A is entered
At t=2, on [client] B is entered
At t=3, on [server] Z is entered
At t=4, on [server] A is printed
At t=5, on [server] Y is entered
At t=6, on [server] B is printed

我期望/想看到的:

At t=1, on [client] A is entered
At t=2, on [server] A is printed
At t=3, on [client] B is entered
At t=4, on [server] B is printed
At t=5, on [server] Z is entered
At t=6, on [client] Z is printed
At t=7, on [server] Y is entered
At t=8, on [client] Y is printed

我错过了什么?也许我需要以某种方式让两端的水槽变得急切?或者每个接收器是否以某种方式被相应的源阻塞(当源正在等待来自命令行的输入时)?

import java.nio.charset.StandardCharsets.UTF_8

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, Tcp}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory

import scala.io.StdIn

object AkkaStreamTcpChatter extends App {
implicit val system = ActorSystem("akka-stream-tcp-chatter", ConfigFactory.defaultReference())
implicit val materializer = ActorMaterializer()

type Message = String
val (host, port) = ("localhost", 46235)

val deserialize:ByteString => Message = _.utf8String
val serialize:Message => ByteString = message => ByteString(message getBytes UTF_8)

val incoming:Flow[ByteString, Message, _] = Flow fromFunction deserialize
val outgoing:Flow[Message, ByteString, _] = Flow fromFunction serialize

val protocol = BidiFlow.fromFlows(incoming, outgoing)

def prompt(s:String):Source[Message, _] = Source fromIterator {
() => Iterator.continually(StdIn readLine s"[$s]> ")
}

val print:Sink[Message, _] = Sink foreach println

args.headOption foreach {
case "server" => server()
case "client" => client()
}

def server():Unit =
Tcp()
.bind(host, port)
.runForeach { _
.flow
.join(protocol)
.runWith(prompt("S"), print)
}

def client():Unit =
Tcp()
.outgoingConnection(host, port)
.join(protocol)
.runWith(prompt("C"), print)
}

最佳答案

我认为问题在于 Akka Stream 确实 operator fusion .这意味着完整的流程处理在单个参与者上运行。当它阻止阅读您的消息时,它无法打印出任何内容。

解决方案是在您的源之后添加一个异步边界。请参阅下面的示例。

def server(): Unit =
Tcp()
.bind(host, port)
.runForeach {
_
.flow
.join(protocol)
.runWith(prompt("S").async, print) // note .async here
}

def client(): Unit =
Tcp()
.outgoingConnection(host, port)
.join(protocol).async
.runWith(prompt("C").async, print) // note .async here

当您添加异步边界时,融合不会跨越边界发生,并且 prompt 会在另一个 actor 上运行,因此不会阻止 print 显示任何内容.

关于scala - 如何使用 Akka Streams 实现一个简单的 TCP 协议(protocol)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36242183/

28 4 0