gpt4 book ai didi

javascript - 双向流转换器的设计模式

转载 作者:太空宇宙 更新时间:2023-11-04 02:33:41 25 4
gpt4 key购买 nike

我想将网络协议(protocol)实现对象设计为完全与套接字无关,并纯粹充当双向转换器。因此,协议(protocol)对象应该从“控制”端输入对象或命令,并从“网络”端发出字节,并接受来自“网络”端的字节以转换为对象/响应并从“控制”端发出。

我无法选择优雅的设计模式来在 Node.js 中执行此操作。我希望它完全兼容 Stream,到目前为止我最终采用了这种方法:

socket = getSocketSomehow();
proto = new Protocol();

socket.pipe(proto.aux);
proto.aux.pipe(socket);

proto.write({ foo: 'this', bar: ['is', 'command'] });
proto.once('data', function(response) {
console.log('this is response: ' + response.quux);
});

proto 是两个交叉连接的双工流的集合,它本身是 stream.Duplex 以及 aux。传入的网络数据进入 proto.aux,然后从 proto 解析并作为对象发出。传入对象转到 proto 并组合成字节并从 proto.aux 发出。

有更好的方法来做同样的事情吗?

最佳答案

我以以下方法结束。代码示例采用 CoffeeScript 格式,以提高可读性。

<小时/>

Bond 类实现 Duplex 流接口(interface),但将两个不相关的流绑定(bind)在一起,因此读取和写入被代理到单独的流。

'use strict'

{ EventEmitter } = require 'events'

class Bond extends EventEmitter
proxyReadableMethod = (method) =>
@::[method] = -> @_bondState.readable[method] arguments...

proxyWritableMethod = (method) =>
@::[method] = -> @_bondState.writable[method] arguments...

proxyReadableMethod 'read'
proxyReadableMethod 'setEncoding'
proxyReadableMethod 'resume'
proxyReadableMethod 'pause'
proxyReadableMethod 'pipe'
proxyReadableMethod 'unpipe'
proxyReadableMethod 'unshift'
proxyReadableMethod 'wrap'

proxyWritableMethod 'write'
proxyWritableMethod 'end'

constructor: (readable, writable) ->
super

@_bondState = {}
@_bondState.readable = readable
@_bondState.writable = writable

proxyEvent = (obj, event) =>
obj.on event, => @emit event, arguments...

proxyEvent readable, 'readable'
proxyEvent readable, 'data'
proxyEvent readable, 'end'
proxyEvent readable, 'close'
# proxyEvent readable, 'error'

proxyEvent writable, 'drain'
proxyEvent writable, 'finish'
proxyEvent writable, 'pipe'
proxyEvent writable, 'unpipe'
# proxyEvent writable, 'error'

module.exports = Bond
<小时/>

Protocol 聚合了两个内部 Transform 流 — ParserComposerParser 获取来自 aux 端的数据,并将其转换为来自 ctl 端的数据,而 Composer 则执行相反的操作。 auxctl 都是解析器和编译器的纽带,但方向不同 - 因此 aux 仅处理传入和传出的“组合”数据,而 ctl 端发出并接受“已解析”数据。我的设计决定是通过 Protocol 本身公开 ctl,并且 aux 作为实例变量可见。

协议(protocol)公开:

  • _parse_compose 作为 _transform 类方法
  • _parseEnd_composeEnd 作为 _flush 类方法
  • 解析组合为类似push的方法
  • unparseuncompose 作为类似于 unshift 的方法
'use strict'

Bond = require './bond'
BacklogTransform = require './backlog-transform'

class Protocol extends Bond
constructor: (options) ->
@_protocolState = {}
@_protocolState.options = options
parser = @_protocolState.parser = new ParserTransform @
composer = @_protocolState.composer = new ComposerTransform @

parser.__name = 'parser'
composer.__name = 'composer'

proxyEvent = (source, event) =>
source.on event, =>
@emit event, arguments...

proxyParserEvent = (event) =>
proxyEvent @_protocolState.parser, event

proxyComposerEvent = (event) =>
proxyEvent @_protocolState.composer, event

proxyParserEvent 'error'
proxyComposerEvent 'error'

super @_protocolState.parser, @_protocolState.composer
@aux = @_protocolState.aux = new Bond @_protocolState.composer, @_protocolState.parser
# @_protocolState.main = @main = new Bond @_protocolState.parser, @_protocolState.composer

parsed: (chunk, encoding) ->
@_protocolState.parser.push chunk, encoding

composed: (chunk, encoding) ->
@_protocolState.composer.push chunk, encoding

unparse: (chunk, encoding) ->
@_protocolState.parser.unshift chunk, encoding

uncompose: (chunk, encoding) ->
@_protocolState.composer.unshift chunk, encoding

#
_parse: (chunk, encoding, callback) ->
throw new TypeError 'not implemented'

_compose: (chunk, encoding, callback) ->
throw new TypeError 'not implemented'

_parseEnd: (callback) ->
callback()

_composeEnd: (callback) ->
callback()

class ParserTransform extends BacklogTransform
constructor: (@protocol) ->
options = @protocol._protocolState.options
super options, options.auxObjectMode, options.mainObjectMode

__transform: (chunk, encoding, callback) ->
@protocol._parse chunk, encoding, callback

__flush: (callback) ->
@protocol._parseEnd callback

class ComposerTransform extends BacklogTransform
constructor: (@protocol) ->
options = @protocol._protocolState.options
super options, options.mainObjectMode, options.auxObjectMode

__transform: (chunk, encoding, callback) ->
@protocol._compose chunk, encoding, callback

__flush: (callback) ->
@protocol._composeEnd callback

module.exports = Protocol
<小时/>

BacklogTransform 是实用程序类,扩展了 Transform 流,能够通过在 _transform 期间调用 unshift 方法将未转换的 block 移回到队列中,因此未移位的数据将出现在下一个 _transform 上,并添加到新 block 之前。不幸的是,实现并不像我希望的那么理想......

'use strict'

async = require 'async'
stream = require 'stream'

class BacklogTransform extends stream.Transform
constructor: (options, writableObjectMode, readableObjectMode) ->
options ?= {}

super options
@_writableState.objectMode = writableObjectMode ? options.writableObjectMode
@_readableState.objectMode = readableObjectMode ? options.readableObjectMode
@_backlogTransformState = {}
@_backlogTransformState.backlog = []

unshift: (chunk, encoding = null) ->
if @_writableState.decodeStrings
chunk = new Buffer chunk, encoding ? @_writableState.defaultEncoding

@_backlogTransformState.backlog.unshift { chunk, encoding }

_flushBacklog: (callback) ->
backlog = @_backlogTransformState.backlog

if backlog.length
if @_writableState.objectMode
async.forever(
(next) =>
return next {} if not backlog.length

{ chunk, encoding } = backlog.shift()
@__transform chunk, encoding, (err) ->
return next { err } if err?

next null

({ err }) ->
return callback err if err?

callback()
)
else
chunks = (chunk for { chunk, encoding } in backlog)

if @_writableState.decodeStrings
encoding = 'buffer'
chunk = Buffer.concat chunks
else
encoding = backlog[0].encoding
for item in backlog[1..]
if encoding != item.encoding
encoding = null
break

chunk = chunks.join ''

@_backlogTransformState.backlog = []
@__transform chunk, encoding, callback
else
callback()

_transform: (chunk, encoding, callback) ->
backlog = @_backlogTransformState.backlog

if backlog.length
backlog.push { chunk, encoding }

@_flushBacklog callback
else
@__transform chunk, encoding, callback

_flush: (callback) ->
@_flushBacklog =>
@__flush callback

__transform: (chunk, encoding, callback) ->
throw new TypeError 'not implemented'

__flush: (callback) ->
callback()

module.exports = BacklogTransform

关于javascript - 双向流转换器的设计模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24460963/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com