- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试在 2 台不同的机器(仅)上实现全双工客户端-服务器通信方案,其中每个端点(客户端或服务器)可以随时异步发送内容(非阻塞管道),并且另一端会拿起它并阅读它。
(请不要通过引用命名管道以外的其他技术来回答;我需要使用这种特殊方法的答案。)
我读过命名管道必须是单向的,否则它们会锁定,但我猜这可能是错误的。我认为管道是基于套接字的,我无法想象底层套接字只是单向的。
这个问题的任何答案都需要解决这些问题才能真正有用:
最佳答案
您不必使用两个管道。我在网上找到了很多答案,说明您需要使用两个管道。我四处寻找,熬夜,试了又试,然后想出了怎么做, super 简单,但你必须把所有事情都做好(尤其是把事情按照正确的调用顺序),否则它就行不通了.另一个技巧是始终确保您有一个未完成的读取调用,否则它也会被锁定。在你知道有人在读之前不要写。除非您先设置了事件,否则不要开始读取调用。那种事。
这是我正在使用的管道类。它可能不够健壮,无法处理管道错误、关闭和溢出。
好的,我不知道这里出了什么问题,但是格式有点不对!
vvvv
namespace Squall
{
public interface PipeSender
{
Task SendCommandAsync(PipeCommandPlusString pCmd);
}
/******************************************************************************
*
*
*
*
******************************************************************************/
public class ClientPipe : BasicPipe
{
NamedPipeClientStream m_pPipe;
public ClientPipe(string szServerName, string szPipeName)
: base("Client")
{
m_szPipeName = szPipeName; // debugging
m_pPipe = new NamedPipeClientStream(szServerName, szPipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
base.SetPipeStream(m_pPipe); // inform base class what to read/write from
}
public void Connect()
{
Debug.WriteLine("Pipe " + FullPipeNameDebug() + " connecting to server");
m_pPipe.Connect(); // doesn't seem to be an async method for this routine. just a timeout.
StartReadingAsync();
}
// the client's pipe index is always 0
internal override int PipeId() { return 0; }
}
/******************************************************************************
*
*
*
*
******************************************************************************/
public class ServerPipe : BasicPipe
{
public event EventHandler<EventArgs> GotConnectionEvent;
NamedPipeServerStream m_pPipe;
int m_nPipeId;
public ServerPipe(string szPipeName, int nPipeId)
: base("Server")
{
m_szPipeName = szPipeName;
m_nPipeId = nPipeId;
m_pPipe = new NamedPipeServerStream(
szPipeName,
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous);
base.SetPipeStream(m_pPipe);
m_pPipe.BeginWaitForConnection(new AsyncCallback(StaticGotPipeConnection), this);
}
static void StaticGotPipeConnection(IAsyncResult pAsyncResult)
{
ServerPipe pThis = pAsyncResult.AsyncState as ServerPipe;
pThis.GotPipeConnection(pAsyncResult);
}
void GotPipeConnection(IAsyncResult pAsyncResult)
{
m_pPipe.EndWaitForConnection(pAsyncResult);
Debug.WriteLine("Server Pipe " + m_szPipeName + " got a connection");
if (GotConnectionEvent != null)
{
GotConnectionEvent(this, new EventArgs());
}
// lodge the first read request to get us going
//
StartReadingAsync();
}
internal override int PipeId() { return m_nPipeId; }
}
/******************************************************************************
*
*
*
*
******************************************************************************/
public abstract class BasicPipe : PipeSender
{
public static int MaxLen = 1024 * 1024; // why not
protected string m_szPipeName;
protected string m_szDebugPipeName;
public event EventHandler<PipeEventArgs> ReadDataEvent;
public event EventHandler<EventArgs> PipeClosedEvent;
protected byte[] m_pPipeBuffer = new byte[BasicPipe.MaxLen];
PipeStream m_pPipeStream;
public BasicPipe(string szDebugPipeName)
{
m_szDebugPipeName = szDebugPipeName;
}
protected void SetPipeStream(PipeStream p)
{
m_pPipeStream = p;
}
protected string FullPipeNameDebug()
{
return m_szDebugPipeName + "-" + m_szPipeName;
}
internal abstract int PipeId();
public void Close()
{
m_pPipeStream.WaitForPipeDrain();
m_pPipeStream.Close();
m_pPipeStream.Dispose();
m_pPipeStream = null;
}
// called when Server pipe gets a connection, or when Client pipe is created
public void StartReadingAsync()
{
Debug.WriteLine("Pipe " + FullPipeNameDebug() + " calling ReadAsync");
// okay we're connected, now immediately listen for incoming buffers
//
byte[] pBuffer = new byte[MaxLen];
m_pPipeStream.ReadAsync(pBuffer, 0, MaxLen).ContinueWith(t =>
{
Debug.WriteLine("Pipe " + FullPipeNameDebug() + " finished a read request");
int ReadLen = t.Result;
if (ReadLen == 0)
{
Debug.WriteLine("Got a null read length, remote pipe was closed");
if (PipeClosedEvent != null)
{
PipeClosedEvent(this, new EventArgs());
}
return;
}
if (ReadDataEvent != null)
{
ReadDataEvent(this, new PipeEventArgs(pBuffer, ReadLen));
}
else
{
Debug.Assert(false, "something happened");
}
// lodge ANOTHER read request
//
StartReadingAsync();
});
}
protected Task WriteByteArray(byte[] pBytes)
{
// this will start writing, but does it copy the memory before returning?
return m_pPipeStream.WriteAsync(pBytes, 0, pBytes.Length);
}
public Task SendCommandAsync(PipeCommandPlusString pCmd)
{
Debug.WriteLine("Pipe " + FullPipeNameDebug() + ", writing " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString());
string szSerializedCmd = JsonConvert.SerializeObject(pCmd);
byte[] pSerializedCmd = Misc.StringToBytes(szSerializedCmd);
Task t = WriteByteArray(pSerializedCmd);
return t;
}
}
/******************************************************************************
*
*
*
*
******************************************************************************/
public class PipeEventArgs
{
public byte[] m_pData;
public int m_nDataLen;
public PipeEventArgs(byte[] pData, int nDataLen)
{
// is this a copy, or an alias copy? I can't remember right now.
m_pData = pData;
m_nDataLen = nDataLen;
}
}
/******************************************************************************
* if we're just going to send a string back and forth, then we can use this
* class. It it allows us to get the bytes as a string. sort of silly.
******************************************************************************/
[Serializable]
public class PipeCommandPlusString
{
public string m_szCommand; // must be public to be serialized
public string m_szString; // ditto
public PipeCommandPlusString(string sz, string szString)
{
m_szCommand = sz;
m_szString = szString;
}
public string GetCommand()
{
return m_szCommand;
}
public string GetTransmittedString()
{
return m_szString;
}
}
}
namespace NamedPipeTest
{
public partial class Form1 : Form
{
SynchronizationContext _context;
Thread m_pThread = null;
volatile bool m_bDieThreadDie;
ServerPipe m_pServerPipe;
ClientPipe m_pClientPipe;
public Form1()
{
InitializeComponent();
}
private void Form1_Load(object sender, EventArgs e)
{
_context = SynchronizationContext.Current;
m_pServerPipe = new ServerPipe("SQUALL_PIPE", 0);
m_pServerPipe.ReadDataEvent += M_pServerPipe_ReadDataEvent;
m_pServerPipe.PipeClosedEvent += M_pServerPipe_PipeClosedEvent;
// m_pThread = new Thread(StaticThreadProc);
// m_pThread.Start( this );
}
private void M_pServerPipe_PipeClosedEvent(object sender, EventArgs e)
{
Debug.WriteLine("Server: Pipe was closed, shutting down");
// have to post this on the main thread
_context.Post(delegate
{
Close();
}, null);
}
private void M_pServerPipe_ReadDataEvent(object sender, PipeEventArgs e)
{
// this gets called on an anonymous thread
byte[] pBytes = e.m_pData;
string szBytes = Misc.BytesToString(pBytes, e.m_pData.Length);
PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes);
string szValue = pCmd.GetTransmittedString();
if (szValue == "CONNECT")
{
Debug.WriteLine("Got command from client: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString() + ", writing command back to client");
PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("SERVER", "CONNECTED");
// fire off an async write
Task t = m_pServerPipe.SendCommandAsync(pCmdToSend);
}
}
static void StaticThreadProc(Object o)
{
Form1 pThis = o as Form1;
pThis.ThreadProc();
}
void ThreadProc()
{
m_pClientPipe = new ClientPipe(".", "SQUALL_PIPE");
m_pClientPipe.ReadDataEvent += PClientPipe_ReadDataEvent;
m_pClientPipe.PipeClosedEvent += M_pClientPipe_PipeClosedEvent;
m_pClientPipe.Connect();
PipeCommandPlusString pCmd = new PipeCommandPlusString("CLIENT", "CONNECT");
int Counter = 1;
while (Counter++ < 10)
{
Debug.WriteLine("Counter = " + Counter);
m_pClientPipe.SendCommandAsync(pCmd);
Thread.Sleep(3000);
}
while (!m_bDieThreadDie)
{
Thread.Sleep(1000);
}
m_pClientPipe.ReadDataEvent -= PClientPipe_ReadDataEvent;
m_pClientPipe.PipeClosedEvent -= M_pClientPipe_PipeClosedEvent;
m_pClientPipe.Close();
m_pClientPipe = null;
}
private void M_pClientPipe_PipeClosedEvent(object sender, EventArgs e)
{
// wait around for server to shut us down
}
private void PClientPipe_ReadDataEvent(object sender, PipeEventArgs e)
{
byte[] pBytes = e.m_pData;
string szBytes = Misc.BytesToString(pBytes, e.m_nDataLen);
PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes);
string szValue = pCmd.GetTransmittedString();
Debug.WriteLine("Got command from server: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString());
if (szValue == "CONNECTED")
{
PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("CLIENT", "DATA");
m_pClientPipe.SendCommandAsync(pCmdToSend);
}
}
private void Form1_FormClosing(object sender, FormClosingEventArgs e)
{
if (m_pThread != null)
{
m_bDieThreadDie = true;
m_pThread.Join();
m_bDieThreadDie = false;
}
m_pServerPipe.ReadDataEvent -= M_pServerPipe_ReadDataEvent;
m_pServerPipe.PipeClosedEvent -= M_pServerPipe_PipeClosedEvent;
m_pServerPipe.Close();
m_pServerPipe = null;
}
}
}
关于.net - c# 全双工异步命名管道 .NET,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34478513/
我必须实现使用特定源端口发送数据并同时监听该端口。全双工。有谁知道如何在java上实现它。我试图创建单独的线程来监听套接字输入流,但它不起作用。我无法将 ServerSocket 和客户端套接字绑定(
我有一个在 Jersey (GlassFish 3.1.1) 上运行的 RESTful Web 服务。这个有点不寻常——它使用 HTTP 流。客户端和服务器有一个长时间运行的对话,在对话中它们不断地向
我在两个主机之间建立了全双工连接,这两个主机与一个始终监听 NetworkStream.Read() 的线程交换数据。我怎样才能优雅地关闭连接避免: 1-两侧read()函数的死锁 2- 读取线程的
该项目是通过 ZeroMQ 构建 Python 和 C# 程序之间的消息传递机制。 我希望消息能够随时从两端传入/传出,这不是基本的请求-回复模型,也就是 REQ/REP。我能想到的一种方法是在两个端
是否有 C 中全双工 ALSA 连接的示例?我读到它受支持,但我看到的所有介绍性示例都记录或播放了声音样本,但我希望有一个处理程序可以为我的 VoIP 应用程序执行这两项操作。 非常感谢您的帮助,延斯
关闭。这个问题需要更多focused .它目前不接受答案。 想改进这个问题吗? 更新问题,使其只关注一个问题 editing this post . 关闭 2 年前。 Improve this qu
我是一名优秀的程序员,十分优秀!