gpt4 book ai didi

scala - 将 TCP 套接字转为 Array[Byte] 的 Observable

转载 作者:可可西里 更新时间:2023-11-01 02:50:48 25 4
gpt4 key购买 nike

在我的 Android 应用程序中,我需要使用 Socket 来发送和接收字节数组。为了方便起见,我想使用连接到 SocketObservable

在互联网上我找到了这段代码:

import rx.lang.scala.Observable

val s = Observable.using[Char,Socket](new Socket("10.0.2.2", 9002))(
socket => Observable.from[Char](Source.fromInputStream(socket.getInputStream).toIterable),
socket => Try(socket.close))
.subscribeOn(rx.lang.scala.schedulers.IOScheduler.apply)

val a = s.subscribe(println, println)

它工作但一次输出一个字符,例如当发送“hello there”字符串时,输出是:

I/System.out: h
I/System.out: e
I/System.out: l
I/System.out: l
I/System.out: o
I/System.out:
I/System.out: t
I/System.out: h
I/System.out: e
I/System.out: r
I/System.out: e

但我想在我的订阅中接收一个缓冲的字节数组。我怎样才能做到这一点?

最佳答案

正如@SeanVieira 所说,您首先必须决定如何聚合流元素、字符。如果您知道流将在每条消息后关闭,您可以等待接收到整个消息,然后在 onCompleted() 上发出序列。

我认为到目前为止你实现的很好,因为它取决于观察者想要处理什么以及如何处理字符。

然后,您可以根据需要添加流转换,例如。 G。

  • 一个buffer ,尤其是看看 ​​tumblingBuffer(boundary))
  • debounce使用缓冲区,就像在这个 SO answer 中所做的那样

在您已创建的 Observable 上使用 tumblingBuffer 的解决方案可能如下所示(未测试):

 source.tumblingBuffer(source.filter(_ == '\n'))

在边界可观察的 source.filter(...) 发出一个元素后,您可以缓冲从源传入的任何内容并发出整个缓冲区。然后您可以使用 mkString 将字符序列转换为字符串并订阅那个 Observable:

source.tumblingBuffer(source.filter(_ == '\n')).map(mkString(_))

关于scala - 将 TCP 套接字转为 Array[Byte] 的 Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41523460/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com