精通
英语
和
开源
,
擅长
开发
与
培训
,
胸怀四海
第一信赖
锐英源精品原创,禁止全文或局部转载,禁止任何形式的非法使用,侵权必究。点名“简易百科”和闲暇巴盗用锐英源原创内容。
最近开发平台,需要进程间通信,因为不复杂,就用了命名管道,在我理解,命名管道的系统压力比SOCKET通信要小,因为不涉及到通信协议栈,用SOCKET进行进程间通信进行集群总线通信锐英源实现过,但是这里不需要SOCKET级别的,所以用命名管道。复杂的平台,比如大数据平台,要求稳定的总线软件,可以用RabbitMQ或RocketMQ,这些软件锐英源也都精通,不管是平台搭建还是底层调优,都可以实现。
命名管道并不简单,本文有细节是需要关注,比如ReadMode和管道分类,如果只是想从网上抄零碎代码还是不好。本文还对架构上进行了扩展,比如令牌。
本文介绍了一个消息传递库,可用于在同一网络上运行的两个 .NET 应用程序之间发送消息。该库允许string在客户端和服务器应用程序之间进行简单的发送。
该库在其内部实现中使用命名管道。有关命名管道的更多信息,请访问MSDN页面。
我还使用以下 CodeProject 文章作为开发库的起点:
我最近遇到了一个场景,我需要通知应用程序升级正在等待开始。我需要一个简单的机制来允许我向应用程序发出信号。一旦发出信号,应用程序将安全关闭,从而开始升级。
在开发这个库之前,我调查了一些 IPC 解决方案。我选择了命名管道,因为它是我能找到的最轻量级的 IPC 方法。
该库使用简单,有两个主要入口点,thePipeServer和PipeClient. 以下代码片段是从源代码中包含的示例应用程序复制而来的:
var pipeServer = new PipeServer("demo", PipeDirection.InOut);
pipeServer.MessageReceived += (s, o) => pipeServer.Send(o.Message);
pipeServer.Start();
var pipeClient = new PipeClient("demo", PipeDirection.InOut);
pipeClient.MessageReceived += (s, o) => Console.WriteLine("Client Received: value: {0}", o.Message);
pipeClient.Connect();
示例应用程序演示了如何使用该库来构建一个简单的回显服务器。MessageReceived事件处理程序简单地回显从客户端接收到的任何消息。
start 方法调用BeginWaitingForConnection传递一个state对象。该Start方法被重载,允许客户端提供取消令牌。
public void Start(CancellationToken token)
{
if (this.disposed)
{
throw new ObjectDisposedException(typeof(PipeServer).Name);
}
var state = new PipeServerState(this.ServerStream, token);
this.ServerStream.BeginWaitForConnection(this.ConnectionCallback, state);
}
该stop方法只是调用Cancel内部的方法CancelllationTokenSource。调用Cancel 设置IsCancelationRequested优雅终止服务器的令牌的属性。
public void Stop()
{
if (this.disposed)
{
throw new ObjectDisposedException(typeof(PipeServer).Name);
}
this.cancellationTokenSource.Cancel();
}
该send方法首先将提供的转换string为字节数组。然后stream使用. BeginWrite_PipeStream
public void Send(string value)
{
if (this.disposed)
{
throw new ObjectDisposedException(typeof(PipeClient).Name);
}
byte[] buffer = Encoding.UTF8.GetBytes(value);
this.ServerStream.BeginWrite(buffer, 0, buffer.Length, this.SendCallback, this.ServerStream);
}
ReadCallback在传入流中接收到数据时调用该方法。接收到的字节首先从 中读取stream,解码回astring并存储在state对象中。
如果IsMessageComplete设置了该属性,则表明客户端已完成发送当前消息。MessageReceived调用事件并清除Message缓冲区。
如果服务器没有停止 - 由取消令牌指示 - 并且客户端仍然连接,则服务器继续读取数据,否则服务器开始等待下一个连接。
private void ReadCallback(IAsyncResult ar)
{
var pipeState = (PipeServerState)ar.AsyncState;
int received = pipeState.PipeServer.EndRead(ar);
string stringData = Encoding.UTF8.GetString(pipeState.Buffer, 0, received);
pipeState.Message.Append(stringData);
if (pipeState.PipeServer.IsMessageComplete)
{
this.OnMessageReceived(new MessageReceivedEventArgs(stringData));
pipeState.Message.Clear();
}
if (!(this.cancellationToken.IsCancellationRequested ||
pipeState.ExternalCancellationToken.IsCancellationRequested))
{
if (pipeState.PipeServer.IsConnected)
{
pipeState.PipeServer.BeginRead(pipeState.Buffer, 0, 255, this.ReadCallback, pipeState);
}
else
{
pipeState.PipeServer.BeginWaitForConnection(this.ConnectionCallback, pipeState);
}
}
}
该connect方法简单地建立与 的连接PipeServer,并开始读取从服务器接收到的第一条消息。一个有趣的警告是,您只能在ClientStream建立连接后设置ReadMode 。
public void Connect(int timeout = 1000)
{
if (this.disposed)
{
throw new ObjectDisposedException(typeof(PipeClient).Name);
}
this.ClientStream.Connect(timeout);
this.ClientStream.ReadMode = PipeTransmissionMode.Message;
var clientState = new PipeClientState(this.ClientStream);
this.ClientStream.BeginRead(
clientState.Buffer,
0,
clientState.Buffer.Length,
this.ReadCallback,
clientState);
}
的send方法与PipeClient非常相似PipeServer。提供string的被转换为字节数组,然后写入stream.
public void Send(string value)
{
if (this.disposed)
{
throw new ObjectDisposedException(typeof(PipeClient).Name);
}
byte[] buffer = Encoding.UTF8.GetBytes(value);
this.ClientStream.BeginWrite(buffer, 0, buffer.Length, this.SendCallback, this.ClientStream);
}
该ReadCallback方法再次类似于PipeServer.ReadCallback没有增加处理取消的复杂性的方法。
private void ReadCallback(IAsyncResult ar)
{
var pipeState = (PipeClientState)ar.AsyncState;
int received = pipeState.PipeClient.EndRead(ar);
string stringData = Encoding.UTF8.GetString(pipeState.Buffer, 0, received);
pipeState.Message.Append(stringData);
if (pipeState.PipeClient.IsMessageComplete)
{
this.OnMessageReceived(new MessageReceivedEventArgs(pipeState.Message.ToString()));
pipeState.Message.Clear();
}
if (pipeState.PipeClient.IsConnected)
{
pipeState.PipeClient.BeginRead(pipeState.Buffer, 0, 255, this.ReadCallback, pipeState);
}
}
System.IO.Pipes命名空间包含用于AnonymousPipes和的托管 API NamedPipes。MSDN页面声明如下:
匿名管道
引用:匿名管道是单向的,不能通过网络使用。它们仅支持单个服务器实例。匿名管道对于线程之间或父子进程之间的通信很有用,其中管道句柄可以在创建时轻松传递给子进程
命名管道
引用:命名管道提供管道服务器和一个或多个管道客户端之间的进程间通信。命名管道可以是单向管道或双工管道。它们支持基于消息的通信,并允许多个客户端使用相同的管道名称同时连接到服务器进程。
我将库基于命名管道,因为我希望库支持双工通信。此外,匿名管道的父子模型不适合我的场景。
命名管道提供两种传输模式,字节模式和消息模式。
在这两种模式下,一侧的写入并不总是会导致另一侧的读取大小相同。这意味着客户端应用程序和服务器应用程序不知道在任何给定时刻正在从管道读取或写入多少字节。
在字节模式下,可以通过搜索和 End-Of-Message 标记来识别完整消息的结尾。这将需要定义一个应用程序级协议,这会给库增加不必要的复杂性。
在消息模式下,可以通过读取IsMessageComplete属性来识别消息的结束。该IsMessageComplete属性true通过调用Read或设置为EndRead。