gpt4 book ai didi

c# - 保存订阅状态以便稍后恢复

转载 作者:太空宇宙 更新时间:2023-11-03 13:40:10 26 4
gpt4 key购买 nike

更新 - 已解决

最终的解决方案与 Brandon 的建议略有不同,但他的回答使我走上了正确的道路。

class State
{
public int Offset { get; set; }
public HashSet<string> UniqueImageUrls = new HashSet<string>();
}

public IObservable<TPicture> GetPictures(ref object _state)
{
var localState = (State) _state ?? new State();
_state = localState;

return Observable.Defer(()=>
{
return Observable.Defer(() => Observable.Return(GetPage(localState.Offset)))
.SubscribeOn(TaskPoolScheduler.Default)
.Do(x=> localState.Offset += 20)
.Repeat()
.TakeWhile(x=> x.Count > 0)
.SelectMany(x=> x)
.Where(x=> !localState.UniqueImageUrls.Contains(x.ImageUrl))
.Do(x=> localState.UniqueImageUrls.Add(x.ImageUrl));
});
}

IList<TPicture> GetPage(int offset)
{
...
return result;
}

原始问题

我目前正在努力解决以下问题。下面显示的 PictureProvider 实现使用偏移量变量,该变量用于提供实际数据的后端服务的分页结果。我想要实现的是一个优雅的解决方案,使当前偏移量可供可观察对象的消费者使用,以允许稍后以正确的偏移量恢复可观察序列。恢复已由 GetPictures() 的 intialState 参数说明。

在更像 RX 的时尚中改进代码的建议也将受到欢迎。我实际上不太确定 Task.Run() 的东西在这里是否合适。

  public class PictureProvider :
IPictureProvider<Picture>
{
#region IPictureProvider implementation

public IObservable<Picture> GetPictures(object initialState)
{
return Observable.Create<Picture>((IObserver<Picture> observer) =>
{
var state = new ProducerState(initialState);
ProducePictures(observer, state);
return state;
});
}

#endregion

void ProducePictures(IObserver<Picture> observer, ProducerState state)
{
Task.Run(() =>
{
try
{
while(!state.Terminate.WaitOne(0))
{
var page = GetPage(state.Offset);

if(page.Count == 0)
{
observer.OnCompleted();
break;
}

else
{
foreach(var picture in page)
observer.OnNext(picture);


state.Offset += page.Count;
}
}
}

catch (Exception ex)
{
observer.OnError(ex);
}

state.TerminateAck.Set();
});
}

IList<Picture> GetPage(int offset)
{
var result = new List<Picture>();

... boring web service call here

return result;
}

public class ProducerState :
IDisposable
{
public ProducerState(object initialState)
{
Terminate = new ManualResetEvent(false);
TerminateAck = new ManualResetEvent(false);

if(initialState != null)
Offset = (int) initialState;
}

public ManualResetEvent Terminate { get; private set; }
public ManualResetEvent TerminateAck { get; private set; }

public int Offset { get; set; }

#region IDisposable implementation

public void Dispose()
{
Terminate.Set();
TerminateAck.WaitOne();

Terminate.Dispose();
TerminateAck.Dispose();
}

#endregion
}
}

最佳答案

我建议重构您的界面以将状态作为数据的一部分生成。现在,客户可以从中断的地方重新订阅。

此外,一旦您开始使用 Rx,您应该会发现使用像 ManualResetEvent 这样的同步原语很少需要。如果重构代码以便检索每个页面都是它自己的 Task ,那么您可以消除所有同步代码。

此外,如果您在 GetPage 中调用“无聊的 Web 服务” ,然后让它异步。这消除了调用 Task.Run 的需要以及其他好处。

这是一个重构版本,使用 .NET 4.5 async/await 语法。它也可以在没有异步/等待的情况下完成。我还添加了一个 GetPageAsync使用 Observable.Run 的方法以防万一您真的无法将 Web 服务调用转换为异步

/// <summary>A set of pictures</summary>
public struct PictureSet
{
public int Offset { get; private set; }
public IList<Picture> Pictures { get; private set; }

/// <summary>Clients will use this property if they want to pick up where they left off</summary>
public int NextOffset { get { return Offset + Pictures.Count; } }
public PictureSet(int offset, IList<Picture> pictures)
:this() { Offset = offset; Pictures = pictures; }
}

public class PictureProvider : IPictureProvider<PictureSet>
{
public IObservable<PictureSet> GetPictures(int offset = 0)
{
// use Defer() so we can capture a copy of offset
// for each observer that subscribes (so multiple
// observers do not update each other's offset
return Observable.Defer<PictureSet>(() =>
{
var localOffset = offset;
// Use Defer so we re-execute GetPageAsync()
// each time through the loop.
// Update localOffset after each GetPageAsync()
// completes so that the next call to GetPageAsync()
// uses the next offset
return Observable.Defer(() => GetPageAsync(localOffset))
.Select(pictures =>
{
var s = new PictureSet(localOffset, pictures);
localOffset += pictures.Count;
})
.Repeat()
.TakeWhile(pictureSet => pictureSet.Pictures.Count > 0);
});
}

private async Task<IList<Picture>> GetPageAsync(int offset)
{
var data = await BoringWebServiceCallAsync(offset);
result = data.Pictures.ToList();
}

// this version uses Observable.Run() (which just uses Task.Run under the hood)
// in case you cannot convert your
// web service call to be asynchronous
private IObservable<IList<Picture>> GetPageAsync(int offset)
{
return Observable.Run(() =>
{
var result = new List<Picture>();
... boring web service call here
return result;
});
}
}

客户只需要添加一个SelectMany打电话得到他们的IObservable<Picture> .他们可以选择存储 pictureSet.NextOffset如果他们愿意的话。

pictureProvider
.GetPictures()
.SelectMany(pictureSet => pictureSet.Pictures)
.Subscribe(picture => whatever);

关于c# - 保存订阅状态以便稍后恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17390453/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com