gpt4 book ai didi

c# - 在返回 WCF 流的同时流式传输到文件?

转载 作者:行者123 更新时间:2023-11-30 16:33:26 25 4
gpt4 key购买 nike

我有一个 WCF 服务,可以从远程 FTP 服务器流式传输大文件 (100MB+)。

[ServiceContract]
public interface IStreamService
{
[OperationContract]
Stream GetDataFromFtp();
}

public class StreamService : IStreamService
{
public Stream GetDataFromFtp()
{
Stream ftpStream = Retr(...);
return ftpStream;
}
}

当 WCF 将文件流式传输到客户端时,我想将它流式传输到本地缓存中,这样以后的请求就不需要一路返回到远程 FTP - 我可以直接从磁盘上为它们提供服务。

如果没有在返回之前在内存中缓冲整个 100MB 文件流,我在实现这一点时遇到了问题。

我尝试使用一个简单的捕获流包装器为每次读取执行写入:

public class CapturingStreamWrapper : Stream
{
private readonly Stream stream;
private readonly Stream captureStream;

public override int Read(byte[] buffer, int offset, int count)
{
int readBytes = stream.Read(buffer, offset, count);
captureStream.Write(buffer, offset, readBytes);

return readBytes;
}
}

public class StreamService : IStreamService
{
public Stream GetDataFromFtp()
{
Stream ftpStream = Retr(...);
Stream cacheStream = File.OpenWrite(...);
return new CapturingStreamWrapper(ftpStream, cacheStream);
}
}

但这似乎不起作用。

此外,这不提供任何错误处理 - 如果客户端传输失败(即事务缓存),我需要缓存的 catch block 来删除任何写入一半的文件。我不确定这将如何工作,因为我不知道在 WCF 生命周期中何时调用/清理流。

有什么想法可以在流回客户端时流式传输到文件中吗?

最佳答案

我最终写了几个相互关联的流类——一个在读取时通过管道传递给另一个。为大代码粘贴道歉:

/// <summary>
/// A stream that, as it reads, makes those bytes available on an ouput
/// stream. Thread safe.
/// </summary>
public class CacheStream : Stream
{
private readonly Stream stream;

public CacheStream(Stream stream)
{
if (stream == null) throw new ArgumentNullException("stream");
this.stream = stream;
OutputStream = new CacheOutputStream(this);
}

public event EventHandler<BytesReadEventArgs> BytesRead = delegate { };
public event EventHandler Closing = delegate { };

public Stream OutputStream { get; private set; }

public override void Flush()
{
stream.Flush();
}

public override long Seek(long offset, SeekOrigin origin)
{
throw new InvalidOperationException("Cannot seek in CachingStream.");
}

public override void SetLength(long value)
{
stream.SetLength(value);
}

public override int Read(byte[] buffer, int offset, int count)
{
int numberOfBytesRead = stream.Read(buffer, offset, count);

if (numberOfBytesRead > 0)
PipeToOutputStream(buffer, offset, numberOfBytesRead);

return numberOfBytesRead;
}

private void PipeToOutputStream(byte[] buffer, int offset, int numberOfBytesRead)
{
var tmp = new byte[numberOfBytesRead];
Array.Copy(buffer, offset, tmp, 0, numberOfBytesRead);
BytesRead(this, new BytesReadEventArgs(tmp));
}

public override void Write(byte[] buffer, int offset, int count)
{
throw new InvalidOperationException("Cannot write in CachingStream.");
}

public override bool CanRead
{
get { return stream.CanRead; }
}

public override bool CanSeek
{
get { return false; }
}

public override bool CanWrite
{
get { return false; }
}

public override long Length
{
get { return stream.Length; }
}

public override long Position
{
get { return stream.Position; }
set { throw new InvalidOperationException("Cannot set position in CachingStream."); }
}

public override void Close()
{
Closing(this, EventArgs.Empty);
base.Close();
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
OutputStream.Dispose();
}
}

/// <summary>
/// Output portion of CacheStream. Streams bytes from a queue of buffers.
/// Thread safe.
/// </summary>
public class CacheOutputStream : Stream
{
private volatile int position;
private volatile int length;
private volatile bool sourceIsClosed;

// No Deque<T> in the BCL yet, but LinkedList is more or less the same.
private readonly LinkedList<byte[]> buffers = new LinkedList<byte[]>();

public CacheOutputStream(CacheStream stream)
{
if (stream == null) throw new ArgumentNullException("stream");

stream.BytesRead += (o, e) => AddToQueue(e.Buffer);
stream.Closing += (o, e) => sourceIsClosed = true;
}

private void AddToQueue(byte[] buffer)
{
if (buffer.Length == 0)
return;

lock (buffers)
{
buffers.AddLast(buffer);
length += buffer.Length;
}
}

public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null) throw new ArgumentNullException("buffer");

bool noMoreBuffersAvailable = HasNoMoreBuffersAvailable();

// Guard clause - closed and nothing more to write.
if (noMoreBuffersAvailable && sourceIsClosed)
return 0;

if (noMoreBuffersAvailable)
{
// Not closed yet! Block infinitely until we get closed or have some data.
while (HasNoMoreBuffersAvailable())
{
if (sourceIsClosed)
return 0;

Thread.Sleep(TimeSpan.FromMilliseconds(50));
}
}

byte[] currentBuffer = GetCurrentBuffer();
int numberOfBytesRead = DoRead(buffer, count, currentBuffer, offset);

PutLeftoverBytesAtFrontOfQueue(currentBuffer, numberOfBytesRead);

return numberOfBytesRead;
}

// Check if caller didn't have enough space to fit the buffer.
// Put the remaining bytes at the front of the queue.
private void PutLeftoverBytesAtFrontOfQueue(byte[] currentBuffer, int numberOfBytesRead)
{
if (currentBuffer == null) throw new ArgumentNullException("currentBuffer");

if (numberOfBytesRead == currentBuffer.Length)
return; // Clean read!

var remainingBuffer = new byte[currentBuffer.Length - numberOfBytesRead];
Array.Copy(currentBuffer, numberOfBytesRead, remainingBuffer, 0, remainingBuffer.Length);

lock (buffers)
buffers.AddFirst(remainingBuffer);
}

private int DoRead(byte[] buffer, int count, byte[] currentBuffer, int offset)
{
int maxNumberOfBytesWeCanWrite = Math.Min(count, currentBuffer.Length);

Array.Copy(currentBuffer, 0, buffer, offset, maxNumberOfBytesWeCanWrite);
position += maxNumberOfBytesWeCanWrite;

return maxNumberOfBytesWeCanWrite;
}

private byte[] GetCurrentBuffer()
{
byte[] currentBuffer;

lock (buffers)
{
currentBuffer = buffers.First.Value;
buffers.RemoveFirst();
}

return currentBuffer;
}

private bool HasNoMoreBuffersAvailable()
{
lock (buffers)
return buffers.Count == 0;
}

public override void Flush() { }

public override long Seek(long offset, SeekOrigin origin)
{
throw new InvalidOperationException("Cannot seek in CachingStream.");
}

public override void SetLength(long value)
{
throw new InvalidOperationException("Cannot set length in CachingStream.");
}

public override void Write(byte[] buffer, int offset, int count)
{
throw new InvalidOperationException("Cannot write in a CachingStream.");
}

public override bool CanRead
{
get { return true; }
}

public override bool CanSeek
{
get { return false; }
}

public override bool CanWrite
{
get { return false; }
}

public override long Length
{
get { return length; }
}

public override long Position
{
get { return position; }
set { throw new InvalidOperationException("Cannot set position in CachingStream."); }
}
}

关于c# - 在返回 WCF 流的同时流式传输到文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3202594/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com