gpt4 book ai didi

Swiftui 任务未并行运行

转载 作者:行者123 更新时间:2023-12-05 04:27:54 25 4
gpt4 key购买 nike

我在 SwiftUI View 中并行运行多项任务时遇到问题。

struct ModelsView: View {
@StateObject var tasks = TasksViewModel()

var body: some View {
NavigationView{
ScrollView {
ForEach(Array(zip(tasks.tasks.indices, tasks.tasks)), id: \.0) { task in
NavigationLink(destination: ModelView()) {
ModelPreviewView(model_name: "3dobject.usdz")
.onAppear {
if task.0 == tasks.tasks.count - 2 {
Task {
print(tasks.tasks.count)
await tasks.fetch_tasks(count: 4)
}
}
}
}
}
}.navigationTitle("3D modelle")
}.onAppear{
Task {
await tasks.fetch_tasks(count: 5)
await tasks.watch_for_new_tasks()
}
}
}
}

在我看来,我会在 View 出现时立即生成一个任务,它首先从数据库中获取 5 个任务(这工作正常),然后它开始监视新任务。

在 ScrollView 中,就在到达底部之前,我开始加载新任务。问题是,异步函数 fetch_tasks(count: 4) 只有在异步函数 watch_for_new_tasks() 停止阻塞时才会继续。

actor TasksViewModel: ObservableObject {
@MainActor @Published private(set) var tasks : [Tasks.Task] = []

private var last_fetched_id : String? = nil

func fetch_tasks(count: UInt32) async {
do {
let tasks_data = try await RedisClient.shared.xrevrange(streamName: "tasks", end: last_fetched_id ?? "+" , start: "-", count: count)
last_fetched_id = tasks_data.last?.id
let fetched_tasks = tasks_data.compactMap { Tasks.Task(from: $0.data) }

await MainActor.run {
withAnimation(.easeInOut) {
self.tasks.append(contentsOf: fetched_tasks)
}
}
} catch {
print("Error fetching taskss \(error)")
}
}

func watch_for_new_tasks() async {
while !Task.isCancelled {
do {

let tasks_data = try await RedisClient.shared.xread(streams: "tasks", ids: "$")
let new_tasks = tasks_data.compactMap { Tasks.Task(from: $0.data) }

await MainActor.run {
for new_task in new_tasks.reversed() {
withAnimation {
self.tasks.insert(new_task, at: 0)
}
}
}
} catch {
print(error)
}
}
}
...
}

异步函数 watch_for_new_tasks() 使用 RedisClient.shared.xread(streams: "tasks", ids: "$") 阻塞,直到至少有一个任务是添加到 Redis 流。

这是我的redis客户端:

class RedisClient {
typealias Stream = Array<StreamElement>

static let shared = RedisClient(host: "127.0.0.1", port: 6379)

let connection: Redis

let host: String
let port: Int32

init(host: String, port: Int32) {
connection = Redis()
self.host = host
self.port = port

connection.connect(host: host, port: port) {error in
if let err = error {
print(err)
}
}
}

func connect() {
connection.connect(host: self.host, port: self.port) {error in
if let err = error {
print(err)
}
}
}

func xrevrange(streamName: String, end: String, start: String, count: UInt32 = 0) async throws -> Stream {
try await withCheckedThrowingContinuation { continuation in
connection.issueCommand("xrevrange", streamName, end, start, "COUNT", String(count)) { res in
switch res {
case .Array(let data):
continuation.resume(returning: data.compactMap { StreamElement(from: $0) } )
case .Error(let error):
continuation.resume(throwing: ResponseError.RedisError(error))
case _:
continuation.resume(throwing: ResponseError.WrongData("Expected Array"))
}
}
}
}

func xread(streams: String..., ids: String..., block: UInt32 = 0, count: UInt32 = 0) async throws -> Stream {
return try await withCheckedThrowingContinuation({ continuation in
var args = ["xread", "BLOCK", String(block),"COUNT", String(count),"STREAMS"]
args.append(contentsOf: streams)
args.append(contentsOf: ids)
connection.issueCommandInArray(args){ res in
print(res)
switch res.asArray?[safe: 0]?.asArray?[safe: 1] ?? .Error("Expected response to be an array") {
case .Array(let data):
continuation.resume(returning: data.compactMap { StreamElement(from: $0) } )
case .Error(let error):
continuation.resume(throwing: ResponseError.RedisError(error))
case _:
continuation.resume(throwing: ResponseError.WrongData("Expected Array"))
}
}
})
}

func xreadgroup(group: String, consumer: String, count: UInt32 = 0, block: UInt32 = 0, streams: String..., ids: String..., noAck: Bool = true) async throws -> Stream {
try await withCheckedThrowingContinuation({ continuation in
var args = ["xreadgroup", "GROUP", group, consumer, "COUNT", String(count), "BLOCK", noAck ? nil : "NOACK", String(block), "STREAMS"].compactMap{ $0 }
args.append(contentsOf: streams)
args.append(contentsOf: ids)
connection.issueCommandInArray(args){ res in
print(res)
switch res.asArray?[safe: 0]?.asArray?[safe: 1] ?? .Error("Expected response to be an array") {
case .Array(let data):
continuation.resume(returning: data.compactMap { StreamElement(from: $0) } )
case .Error(let error):
continuation.resume(throwing: ResponseError.RedisError(error))
case _:
continuation.resume(throwing: ResponseError.WrongData("Expected Array"))
}
}
})
}

enum ResponseError: Error {
case RedisError(String)
case WrongData(String)
}

struct StreamElement {
let id: String
let data: [RedisResponse]

init?(from value: RedisResponse) {
guard
case .Array(let values) = value,
let id = values[0].asString,
let data = values[1].asArray
else { return nil }

self.id = id.asString
self.data = data
}
}
}

我尝试在 Task.detached 任务上运行 watch_for_new_tasks(),但这也会阻塞。

老实说,我不知道为什么会阻塞,如果可以的话,我可以求助于你。

提前致谢

迈克尔

最佳答案

.onAppear {
Task {
await tasks.fetch_tasks(count: 5)
await tasks.watch_for_new_tasks()
}
}
  1. 这不会并行运行任务。要执行第二个 await,第一个必须完成。
  2. 你可以使用.task修饰符

您的代码可以重构为并行运行 2 个异步函数:

.task {
async let fetchTask = tasks.fetch_tasks(count: 5)
async let watchTask = tasks.watch_for_new_tasks()
}

你可以这样做:

await [fetchTask, watchTask]

如果你需要在它们都完成后做某事

关于Swiftui 任务未并行运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72745377/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com