gpt4 book ai didi

websocket - Kafka 消息到 websocket

转载 作者:行者123 更新时间:2023-12-03 00:00:56 25 4
gpt4 key购买 nike

我正在尝试使用reactive-kafka、akka-http和akka-stream将Kafka消费者写入websocket流。

  val publisherActor = actorSystem.actorOf(CommandPublisher.props)
val publisher = ActorPublisher[String](publisherActor)
val commandSource = Source.fromPublisher(publisher) map toMessage
def toMessage(c: String): Message = TextMessage.Strict(c)

class CommandPublisher extends ActorPublisher[String] {
override def receive = {
case cmd: String =>
if (isActive && totalDemand > 0)
onNext(cmd)
}
}

object CommandPublisher {
def props: Props = Props(new CommandPublisher())
}

// This is the route
def mainFlow(): Route = {
path("ws" / "commands" ) {
handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource))
}
}

从kafka消费者(这里省略),我做了一个publisherActor! commandString 动态添加内容到 websocket。

但是,当我启动多个 Websocket 客户端时,我在后端遇到了此异常:

[ERROR] [03/31/2016 21:17:10.335] [KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)] WebSocket handler failed with can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
java.lang.IllegalStateException: can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
at akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35)
at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295)
...

一个流不能用于所有 websocket 客户端吗?或者应该为每个客户端创建流程/发布者参与者?

在这里,我打算向所有 websocket 客户端发送“当前”/“实时”通知。通知历史记录无关紧要,新客户需要忽略。

最佳答案

很抱歉告诉您一个坏消息,但看起来这是 akka 对于 .您无法按照自己的意愿为所有客户端重用流的实例。由于 Rx 模型,扇出必须是“显式的”。

我遇到的使用特定于路由的Flow的示例:

  // The flow from beginning to end to be passed into handleWebsocketMessages
def websocketDispatchFlow(sender: String): Flow[Message, Message, Unit] =
Flow[Message]
// First we convert the TextMessage to a ReceivedMessage
.collect { case TextMessage.Strict(msg) => ReceivedMessage(sender, msg) }
// Then we send the message to the dispatch actor which fans it out
.via(dispatchActorFlow(sender))
// The message is converted back to a TextMessage for serialization across the socket
.map { case ReceivedMessage(from, msg) => TextMessage.Strict(s"$from: $msg") }

def route =
(get & path("chat") & parameter('name)) { name =>
handleWebsocketMessages(websocketDispatchFlow(sender = name))
}

这是对此的讨论:

And this is exactly what I don't like in Akka Stream, this explicit fan-out. When I receive a data source from somewhere that I want to process (e.g. Observable or a Source), I just want to subscribe to it and I don't want to care on whether it's cold or hot or whether it's been subscribed by other subscribers or not. This is my river analogy. The river should not care about who drinks from it and the drinkers should not care about the river's source or about how many other drinkers there are. My sample, that is equivalent to the one Mathias provided, does share the data-source, but it simply does reference counting and you can have 2 subscribers or you can have 100, doesn't matter. And here I've gotten fancy, but reference counting doesn't work if you don't want to lose events or if you want to ensure that the stream remains always-on. But then you use ConnectableObservable which has connect(): Cancelable and that's a perfect fit for say ... a Play's LifeCycle Plugin. And underlying that you can use a BehaviorSubject or a ReplaySubject if you want to repeat previous values for new subscribers. And things just work after that, no manual drawing of that connections graph needed. ... ... (this is from https://bionicspirit.com/blog/2015/09/06/monifu-vs-akka-streams.html) ... For functions that take an Observable and return an Observable, we indeed have lift, which is the closest thing to something that has a name and can be used to great effect in Monifu for Subject or other Observable types because of the LiftOperators1 (and 2), which is what makes it possible to transform Observables without losing their type - this is an OOP-ish improvement over what RxJava does with lift.

But, such functions are not equivalent to Processor / Subject. The difference is that Subject is at the same time a consumer and a producer. This means that subscribers do not get to control exactly when the data-source starts and that the data-source is in essence hot (meaning that multiple subscribers share the same data-source). In Rx it's totally fine if you model cold observables (meaning observables that start a new data-source per each individual subscriber). On the other hand in Rx (in general) it's not OK to have data sources that can be subscribed only once and then that's it. The only exception to this rule in Monifu are the Observables produced by the GroupBy operator, but that's like the exception that confirms the rule.

What this means, especially coupled with another restriction of the contract of both Monifu and the Reactive Streams protocol (thou shall not subscribe multiple times with the same consumer), is that a Subject or a Processor instance is not reusable. In order for such an instance to be reusable, the Rx model would need a factory of Processor. Furthermore this means that whenever you want to use a Subject / Processor, your data source must automatically be hot (shareable between multiple subscribers).

关于websocket - Kafka 消息到 websocket,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36348020/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com