gpt4 book ai didi

sockets - Alpakka UDP : How can I respond to received datagrams via the already bound socket?

转载 作者:行者123 更新时间:2023-12-03 12:08:21 25 4
gpt4 key购买 nike

我正在使用Alpakkas UDP.bindFlow将传入的UDP数据报转发到Kafka代理。正在发送这些数据报的旧版应用程序需要从与发送消息相同的端口进行UDP响应。我正在努力为这种行为建模,因为这需要我将流的输出连接到其输入。

我尝试了此解决方案,但是它不起作用,因为响应数据报是从其他源端口发送的:

import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.alpakka.udp.Datagram
import akka.stream.alpakka.udp.scaladsl.Udp
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

object UdpInput extends App {

implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()

val socket = new InetSocketAddress("0.0.0.0", 40000)
val udpBindFlow = Udp.bindFlow(socket)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
val kafkaSink = Flow[Datagram].map(toProducerRecord).to(Producer.plainSink(producerSettings))

def toProducerRecord(datagram: Datagram) = new ProducerRecord[String, String]("udp", datagram.data.utf8String)
def toResponseDatagram(datagram: Datagram) = Datagram(ByteString("OK"), datagram.remote)

// Does not model the behaviour I'm looking for because
// the response datagram is sent from a different source port
Source.asSubscriber
.via(udpBindFlow)
.alsoTo(kafkaSink)
.map(toResponseDatagram)
.to(Udp.sendSink)
.run
}

最佳答案

我最终使用GraphDSL来实现循环图。感谢dvim为我指出正确的方向!

import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.alpakka.udp.Datagram
import akka.stream.alpakka.udp.scaladsl.Udp
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, MergePreferred, RunnableGraph, Source}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.util.ByteString
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

object UdpInput extends App {

implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()

val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
val socket = new InetSocketAddress("0.0.0.0", 40000)
val udpBindFlow = Udp.bindFlow(socket)
val udpResponseFlow = Flow[Datagram].map(toResponseDatagram)
val kafkaSink = Flow[Datagram].map(toProducerRecord).to(Producer.plainSink(producerSettings))

def toProducerRecord(datagram: Datagram) = new ProducerRecord[String, String]("udp", datagram.data.utf8String)
def toResponseDatagram(datagram: Datagram) = Datagram(ByteString("OK"), datagram.remote)

RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
val merge = b.add(MergePreferred[Datagram](1))
val bcast = b.add(Broadcast[Datagram](2))

Source.asSubscriber ~> merge ~> udpBindFlow ~> bcast ~> kafkaSink
merge.preferred <~ udpResponseFlow <~ bcast
ClosedShape
}).run
}

关于sockets - Alpakka UDP : How can I respond to received datagrams via the already bound socket?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52312784/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com