gpt4 book ai didi

asynchronous - 如何创建一个流,其中项目基于流之前返回的项目?

转载 作者:行者123 更新时间:2023-12-01 09:41:38 26 4
gpt4 key购买 nike

我有一个函数可以生成 futures::Stream基于一个论点。我想多次调用此函数并将流压平在一起。使问题复杂化的事实是,我想将流返回的值作为参数反馈给原始函数。

具体来说,我有一个函数可以将数字流返回为零:

fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}

我想从 5 开始调用这个函数。还应该为返回的每个奇数调用该函数。总集调用 numbers_down_to_zero将是:

numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);

产生总流

4
3
2
1
0
2
1
0
0
0

有哪些技术可以实现这一点?

最佳答案

你可以用 unfold 解决这个问题.您将有一个“状态”结构,它保留“基本流”(在这种情况下向零倒计时)和将产生新流的项目列表,并将其用作 unfold 的参数。在展开时保持状态。

这样编译器就不必考虑生命周期所有权,因为状态可以移动到 async 中。每次调用闭包都会阻塞。

/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
stream::iter((0..n).rev())
}

/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
/// Helper struct to keep state while unfolding
struct StreamState<S> {
inner_stream: S,
item_queue: VecDeque<i32>,
}

// Build helper struct
let state = StreamState {
inner_stream: f(n),
item_queue: VecDeque::new(),
};

// Unfold with state
stream::unfold(state, |mut state| async move {
loop {
if let Some(item) = state.inner_stream.next().await {
// Iterate inner stream, and potentially push item to queue
if item % 2 == 1 {
state.item_queue.push_front(item);
}
break Some((item, state));
} else if let Some(item) = state.item_queue.pop_back() {
// If inner stream is exhausted, produce new stream from queue
// and repeat loop
state.inner_stream = f(item);
} else {
// If queue is empty, we are done
break None;
}
}
})
}

Full playground example

StreamExt::next 要求内部流实现 Unpin ,因此它不适用于任意流。您可以随时使用 Box::pin(stream)相反,因为 Pin<Box<T>>Unpin和实现 Stream如果 T: Stream .

关于asynchronous - 如何创建一个流,其中项目基于流之前返回的项目?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59697795/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com