- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
用 Rx 编写 GetMessages
函数最简洁的方法是什么:
static void Main()
{
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var messages = GetMessages(socket, IPAddress.Loopback, 4000);
messages.Subscribe(x => Console.WriteLine(x));
Console.ReadKey();
}
static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
// now will receive a stream of messages
// each message is prefixed with an 4 bytes/Int32 indicating it's length.
// the rest of the message is a string
// ????????????? Now What ?????????????
}
作为上述示例驱动程序的简单服务器:http://gist.github.com/452893#file_program.cs
我一直在调查使用 Reactive Extensions对于我正在做的一些套接字编程工作。我这样做的动机是它会以某种方式使代码“更简单”。这是否意味着更少的代码,更少的嵌套。
然而到目前为止,情况似乎并非如此:
Observable
有 FromAsyncPattern 的扩展方法,这不包括 SocketEventArgs
异步 API。这是我目前所拥有的。这不起作用,它因堆栈溢出而失败(呵呵)我还没有弄清楚语义,所以我可以创建一个 IObservable
来读取指定数量的字节。
static IObservable<int> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
// keep reading until we get the first 4 bytes
byte[] buffer = new byte[1024];
var readAsync = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);
IObservable<int> readBytes = null;
var temp = from totalRead in Observable.Defer(() => readBytes)
where totalRead < 4
select readAsync(buffer, totalRead, totalRead - 4, SocketFlags.None);
readBytes = temp.SelectMany(x => x).Sum();
var nowDoSomethingElse = readBytes.SkipUntil(whenConnect);
}
最佳答案
按照这些思路可能会奏效。这没有经过测试,没有考虑异常和部分返回消息的情况。但除此之外,我相信这是一个正确的方向。
public static IObservable<T> GetSocketData<T>(this Socket socket,
int sizeToRead, Func<byte[], T> valueExtractor)
{
return Observable.CreateWithDisposable<T>(observer =>
{
var readSize = Observable
.FromAsyncPattern<byte[], int, int, SocketFlags, int>(
socket.BeginReceive,
socket.EndReceive);
var buffer = new byte[sizeToRead];
return readSize(buffer, 0, sizeToRead, SocketFlags.None)
.Subscribe(
x => observer.OnNext(valueExtractor(buffer)),
observer.OnError,
observer.OnCompleted);
});
}
public static IObservable<int> GetMessageSize(this Socket socket)
{
return socket.GetSocketData(4, buf => BitConverter.ToInt32(buf, 0));
}
public static IObservable<string> GetMessageBody(this Socket socket,
int messageSize)
{
return socket.GetSocketData(messageSize, buf =>
Encoding.UTF8.GetString(buf, 0, messageSize));
}
public static IObservable<string> GetMessage(this Socket socket)
{
return
from size in socket.GetMessageSize()
from message in Observable.If(() => size != 0,
socket.GetMessageBody(size),
Observable.Return<string>(null))
select message;
}
public static IObservable<string> GetMessagesFromConnected(
this Socket socket)
{
return socket
.GetMessage()
.Repeat()
.TakeWhile(msg => !string.IsNullOrEmpty(msg));
}
public static IObservable<string> GetMessages(this Socket socket,
IPAddress addr, int port)
{
return Observable.Defer(() =>
{
var whenConnect = Observable
.FromAsyncPattern<IPAddress, int>(
socket.BeginConnect, socket.EndConnect);
return from _ in whenConnect(addr, port)
from msg in socket.GetMessagesFromConnected()
.Finally(socket.Close)
select msg;
});
}
编辑:为了处理不完整的读取,可以使用 Observable.While(在 GetSockedData 中),正如 Dave Sexton 在 same thread on RX forum 中所建议的那样.
编辑:另外,请看一下 Jeffrey Van Gogh 的这篇文章:Asynchronous System.IO.Stream reading
关于c# - 使用 Reactive Extensions (Rx) 进行套接字编程实用吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3118289/
我有一个问题,我想通过其他程序打开 chrome://extensions/页面,例如 cmd.exe 或其他程序。 我们知道,如果我们用chrome.exe打开一个网站,我们可以在cmd.exe中执
当您编写manifest.json 文件时,您必须为内容脚本指定匹配。 http 和 https 工作正常,但如果我尝试包含 chrome://*/* 或其任何变体,我会得到一个我尝试对我的匹配使用无
我真的很困惑我想制作一个可以扩展用户的Google日历的Chrome扩展程序,我应该在Google API下注册哪种程序? 它是Web App吗?但是我不打算让服务器托管任何东西,因为Chrome扩展
我想在带有chrome-extension://URL的iframe上运行内容脚本。我在我的manifest.json文件中添加了一行代码,该行是从http://code.google.com/chr
目前,我正在使用记事本和 chrome 控制台的组合对我的 google-chrome-extensions 进行编码。我 100% 确信有更好的方法来对这些扩展进行编程。人们使用什么环境? 最佳答案
在编写 manifest.json 文件时,必须指定 matches用于您的内容脚本。 http和 https工作正常,但如果我尝试包含 chrome://*/*或它的任何变体,我收到一个错误,提示我
关闭。这个问题需要details or clarity .它目前不接受答案。 想改进这个问题吗? 通过 editing this post 添加细节并澄清问题. 关闭 7 年前。 Improve
在发布更新后,我正在尝试为我的 Chrome 扩展程序的用户创造流畅的体验。 我在更新应用程序时重新注入(inject)了我的内容脚本,即使用户继续在扩展更新后未刷新的页面上使用我的扩展,我的功能仍然
将扩展程序从 Chrome 移植到 FF 遵循本教程(在 Chrome 中运行良好):http://www.codingscripts.com/check-whether-user-has-a-chr
我正在将 google-chrome 扩展改编成 firefox。 这个扩展相当简单,它只是重新加载当前浏览器窗口并在其中放置一个特定的字符串(它用于在 Odoo 上激活调试状态)。 但是,当我在 m
我正在尝试在普通 HTML 页面(非扩展)中链接到 chrome://extensions。但是单击链接不会执行任何操作: chrome://extensions 右键单击并在新选项卡中打开只会打开
为 String 编写扩展名很容易,但问题是它总是显示为 "MyString".ExtensionMethod() 如果这样写: public static class Extensions{
如题。我正在运行 Joomla 2.5。 “扩展”下拉菜单中唯一可见的项目是: 模块经理 插件管理器 模板管理器 语言经理 编辑:我这样做是为了安装模板,按照此页面上的说明:http://docs.j
基本上我希望文件名以扩展名列表中的扩展名结尾。这是我在 python 中的代码。我已经将一些示例文件名作为列表,如下所示: extensions = ['.mp3','.m4a','.wma'] fi
在 background.html : chrome.tabs.query({active:true, currentWindow:true},function(tabs){ chrome.tab
我有一个可能被用户禁用的 chrome 扩展。在这种情况下,我想创建一个指向 chrome://extensions 菜单的链接。它会是这样的 Chrome extensions 这是不允许的:不允许
我查看了 Google 文档,但不知道如何更改其类型。 这是我加载时遇到的错误。 尝试安装此扩展时出现警告:“browser_action”仅允许用于扩展程序,这是一个旧版打包应用程序。 这是我的ma
我有一个正在构建的 chrome 扩展,它使用 OAuth 访问许多 API。我没有将我的消费者 secret 存储在扩展程序中,而是重定向到获取 token 的服务器,然后重定向回我的扩展程序中的页
我有一个正在构建的 chrome 扩展,它使用 OAuth 访问许多 API。我没有将我的消费者 secret 存储在扩展程序中,而是重定向到获取 token 的服务器,然后重定向回我的扩展程序中的页
这个问题已经有答案了: Why would a developer place a forward slash at the start of each relative path? (4 个回答)
我是一名优秀的程序员,十分优秀!