gpt4 book ai didi

multithreading - Npgsql 事件处理和并发

转载 作者:行者123 更新时间:2023-12-04 04:34:02 25 4
gpt4 key购买 nike

我想将现有的事件实现替换为可以在事件进入时处理事件的实现,如有必要,可以同时处理。不幸的是,我以前从未实现过任何类型的并发,但必须从某个地方开始,嗯?

我一直在阅读 Functional Reactive Programming .尽管看起来相对简单的示例,但我对如何将这些示例应用到我的应用程序中感到困惑。事实上,我什至不确定这是否是这里实现并发最合适的方式。

我还能够以某种方式利用任务并行库 (TPL),因为它提供线程管理,所以我不需要太担心产生太多线程。遗憾的是,上述链接中的示例不包含任何使用 TPL 的示例。

我当前的代码如下。我仍然是一个新手程序员,需要一些帮助才能通过。为麻烦道歉。 :(

open System
open System.Threading
open Npgsql

// This application is a Windows service. PostgreSQL sends a notice whenever a new row has been added to a table.

// The next function processes new rows, aka tasks.
let private processTask () =
EventLog.writeEventLog "Information" "Received new task notification."
// Task processing yet to be implemented.

// A connection to PostgreSQL that stays open while service is running. Receives notifications.
let newNotifyConnection (host : string, username : string, password : string, database : string) =
let connectionString = sprintf "Host=%s;Username=%s;Password=%s;Database=%s;ContinuousProcessing=true;Keepalive=120;CommandTimeout=0" host username password database
new NpgsqlConnection(connectionString)

let private notifyConnection = newNotifyConnection Settings.npgsqlConnection
let private listen = new NpgsqlCommand("LISTEN newtask", notifyConnection)

// Event for receiving and processing notifications.
let private onNotification (sender : obj) (e : NpgsqlNotificationEventArgs) =
processTask()

let private notificationEventHandler = new NotificationEventHandler(onNotification)

// Run this function when service starts.
let startWorker () =
notifyConnection.Notification.AddHandler(notificationEventHandler)
notifyConnection.Open()
listen.ExecuteNonQuery() |> ignore

// Run this function when service stops.
let stopWorker () =
listen.Dispose()
notifyConnection.Dispose()

最佳答案

如果您愿意先使用 F# 和 RX 走函数式编程路线,我建议安装 Nuget 提供的 FSharp.Control.Reactive。它将使使用 RX 变得更加容易,包括向 Observable 模块添加更多方法(默认情况下,F# 仅包含一个子集)。

对我来说,代码中的关键方法如下,因为这是将回调链接到事件源的方法。

let startWorker () =
notifyConnection.Notification.AddHandler(notificationEventHandler)
notifyConnection.Open()
listen.ExecuteNonQuery() |> ignore

诀窍是为您的 notifyConnection 对象提供 RX 提供的回调对象,可以说是 Npgsql 库和 RX 框架之间的桥梁。

就像是:
open System
open System.Reactive.Linq

let observableCreate subscriptionFunction = Observable.Create(new Func<_, IDisposable>(subscriptionFunction))

let sourceObservable = observableCreate (fun observer ->
// Your notifyConnection object will now direct all events to the observer provided
notifyConnection.Notification.AddHandler(observer.OnNext)
notifyConnection.Open()
listen.ExecuteNonQuery() |> ignore
// RX includes the CompositeDisposable class which you can also use
{ new IDisposable with
x.Dispose() =
listen.Dispose()
notifyConnection.Dispose() })

use subscription = sourceObservable.Subscribe(notifyConnectionHandler)

请注意,FSharp.Control.Reactive Nuget 包提供 Observable.create 函数。 (编辑:有人指出 Reactive 包没有 Observable.Create 的包装器,所以我在自己上面定义了它)。

以上将(在订阅时 - 使用 Subscribe() 方法):
- 打开连接
- 执行查询
- 创建一个一次性的,以便在处理订阅时使用。

更有趣的是,您可以使用 Observable 模块中的方法链接可观察对象并与其他对象组合,从而允许您创建响应式(Reactive)程序。如果您有兴趣在 F# 中使用 IEvent/IObservable,那么阅读 RX 可能是明智的。

关于multithreading - Npgsql 事件处理和并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32709261/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com