gpt4 book ai didi

go - Go 例程后关闭冗余 sql.Rows 对象的推荐方法

转载 作者:行者123 更新时间:2023-12-01 20:22:30 25 4
gpt4 key购买 nike

我正在使用 Go 例程将查询并行发送到 PostgreSQL 主节点和从节点。返回有效结果的第一个主机获胜。错误情况不在此问题的范围内。

调用者是唯一关心 *sql.Rows 内容的人。对象,所以故意我的函数不对这些对象进行任何操作。我使用缓冲 channel 从 Go 例程中检索返回对象,因此应该没有 Go 例程泄漏。垃圾收集应该照顾其余的。

有一个问题我没有正确解决:留在 channel 中的 Rows 对象永远不会关闭。当我从(只读)事务中调用此函数时,tx.Rollback()为每个未关闭 Rows 的实例返回一个错误对象:"unexpected command tag SELECT" .

从更高级别的对象调用此函数:

func multiQuery(ctx context.Context, xs []executor, query string, args ...interface{}) (*sql.Rows, error) {
rc := make(chan *sql.Rows, len(xs))
ec := make(chan error, len(xs))
for _, x := range xs {
go func(x executor) {
rows, err := x.QueryContext(ctx, query, args...)
switch { // Make sure only one of them is returned
case err != nil:
ec <- err
case rows != nil:
rc <- rows
}
}(x)
}

var me MultiError
for i := 0; i < len(xs); i++ {
select {
case err := <-ec:
me.append(err)
case rows := <-rc: // Return on the first success
return rows, nil
}
}
return nil, me.check()
}

执行者可以是 *sql.DB , *sql.Tx或任何符合接口(interface)的东西:
type executor interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}

回滚逻辑:
func (mtx MultiTx) Rollback() error {
ec := make(chan error, len(mtx))
for _, tx := range mtx {
go func(tx *Tx) {
err := tx.Rollback()
ec <- err
}(tx)
}
var me MultiError
for i := 0; i < len(mtx); i++ {
if err := <-ec; err != nil {
me.append(err)
}
}
return me.check()
}
MultiTx是多个节点上开放交易的集合。它是调用 multiQuery 的更高级别的对象。

“清理”未使用的行的最佳方法是什么?我正在考虑不做的选项:
  • 取消上下文:我相信它会不一致地工作,到时间 cancel() 可能已经返回了多个查询被称为
  • 创建一个延迟的 Go 例程,继续耗尽 channel 并关闭行对象:如果 DB 节点响应缓慢,Rollback()仍然在 rows.Close() 之前调用
  • 使用 sync.WaitGroup MultiTx 类型中的某处,可能与 (2) 结合使用:如果其中一个节点无响应,这可能会导致回滚挂起。另外,我不确定我将如何实现它。
  • 忽略回滚错误:忽略错误听起来从来都不是一个好主意,它们的存在是有原因的。

  • 解决此问题的推荐方法是什么?

    编辑:

    正如@Peter 所建议的那样,我尝试取消上下文,但这似乎也使查询中返回的所有行无效。在 rows.Scan我收到 context canceled更高级别的调用者的错误。

    这是我到目前为止所做的:
    func multiQuery(ctx context.Context, xs []executor, query string, args ...interface{}) (*sql.Rows, error) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    rc := make(chan *sql.Rows, len(xs))
    ec := make(chan error, len(xs))
    for _, x := range xs {
    go func(x executor) {
    rows, err := x.QueryContext(ctx, query, args...)
    switch { // Make sure only one of them is returned
    case err != nil:
    ec <- err
    case rows != nil:
    rc <- rows
    cancel() // Cancel on success
    }
    }(x)
    }

    var (
    me MultiError
    rows *sql.Rows
    )
    for i := 0; i < len(xs); i++ {
    select {
    case err := <-ec:
    me.append(err)
    case r := <-rc:
    if rows == nil { // Only use the first rows
    rows = r
    } else {
    r.Close() // Cleanup remaining rows, if there are any
    }
    }
    }
    if rows != nil {
    return rows, nil
    }

    return nil, me.check()
    }

    编辑2:

    @Adrian 提到:

    we can't see the code that's actually using any of this.



    此代码由类型方法重用。首先是交易类型。此问题中的问题出现在 Rollback()上面的方法。
    // MultiTx holds a slice of open transactions to multiple nodes.
    // All methods on this type run their sql.Tx variant in one Go routine per Node.
    type MultiTx []*Tx

    // QueryContext runs sql.Tx.QueryContext on the tranactions in separate Go routines.
    // The first non-error result is returned immediately
    // and errors from the other Nodes will be ignored.
    //
    // If all nodes respond with the same error, that exact error is returned as-is.
    // If there is a variety of errors, they will be embedded in a MultiError return.
    //
    // Implements boil.ContextExecutor.
    func (mtx MultiTx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
    return multiQuery(ctx, mtx2Exec(mtx), query, args...)
    }

    然后是:
    // MultiNode holds a slice of Nodes.
    // All methods on this type run their sql.DB variant in one Go routine per Node.
    type MultiNode []*Node

    // QueryContext runs sql.DB.QueryContext on the Nodes in separate Go routines.
    // The first non-error result is returned immediately
    // and errors from the other Nodes will be ignored.
    //
    // If all nodes respond with the same error, that exact error is returned as-is.
    // If there is a variety of errors, they will be embedded in a MultiError return.
    //
    // Implements boil.ContextExecutor.
    func (mn MultiNode) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
    return multiQuery(ctx, nodes2Exec(mn), query, args...)
    }

    这些方法是 multiQuery() 周围的公共(public)包装器功能。现在我意识到只需发送 *Rows进入缓冲 channel 就不行了,实际上是内存泄漏。在交易案例中,它变得很清楚,如 Rollback()开始提示。但在非交易变体中, *Rows channel 内部永远不会被垃圾收集,因为驱动程序可能会一直引用它直到 rows.Close()叫做。

    我已经写了这个包,供 ORM 使用, sqlboiler .我的更高级别逻辑通过 MultiTX反对 ORM。从那时起,我对返回的 Rows 没有任何明确的控制权。 .一个简单的方法是我的更高级别的代码在 Rollback() 之前取消上下文。 ,但我不喜欢这样:
  • 它提供了一个非直观的 API。这种(惯用的)方法会破坏:
  • ctx, cancel = context.WithCancel(context.Background())
    defer cancel()
    tx, _ := db.BeginTx(ctx)
    defer tx.Rollback()

  • ORM 的接口(interface)还指定了常规的、非上下文感知的 Query()变体,在我的包中将针对 context.Background() .

  • 我开始担心这会被设计破坏...无论如何,我将首先实现一个 Go 例程,该例程将耗尽 channel 并关闭 *Rows .之后我会看看我是否可以实现一些不会影响返回的合理等待/取消机制 *Rows

    最佳答案

    我认为下面的函数将满足您的要求,但前提是当您完成结果时应该取消传入的上下文(否则一个 context.WithCancel 会泄漏;我看不出有什么办法可以取消它在函数内将使返回的 sql.Rows 无效)。

    请注意,我没有时间对此进行测试(需要设置数据库,实现接口(interface)等),因此代码中可能隐藏了一个错误(但我相信基本算法是合理的)

    // queryResult holds the goroutine# and the result from that gorouting (need both so we can avoid cancelling the relevant context)
    type queryResult struct {
    no int
    rows *sql.Rows
    }

    // multiQuery - Executes multiple queries and returns either the first to resutn a result or, if all fail, a multierror summarising the errors
    // Important: This should be used for READ ONLY queries only (it is possible that more than one will complete)
    // Note: The ctx passed in must be cancelled to avoid leaking a context (this routine cannot cancel the context used for the winning query)
    func multiQuery(ctx context.Context, xs []executor, query string, args ...interface{}) (*sql.Rows, error) {
    noOfQueries := len(xs)
    rc := make(chan queryResult) // Channel for results; unbuffered because we only want one, and only one, result
    ec := make(chan error) // errors get sent here - goroutines must send a result or 1 error
    defer close(ec) // Ensure the error consolidation go routine will complete

    // We need a way to cancel individual goroutines as we do not know which one will succeed
    cancelFns := make([]context.CancelFunc, noOfQueries)

    // All goroutines must terminate before we exit (otherwise the transaction maybe rolled back before they are cancelled leading to "unexpected command tag SELECT")
    var wg sync.WaitGroup
    wg.Add(noOfQueries)

    for i, x := range xs {
    var queryCtx context.Context
    queryCtx, cancelFns[i] = context.WithCancel(ctx)
    go func(ctx context.Context, queryNo int, x executor) {
    defer wg.Done()

    rows, err := x.QueryContext(ctx, query, args...)
    if err != nil {
    ec <- err // Error collection go routine guaranteed to run until all query goroutines complete
    return
    }

    select {
    case rc <- queryResult{queryNo, rows}:
    return
    case <-ctx.Done(): // If another query has already transmitted its results these should be thrown away
    rows.Close() // not strictly required because closed context should tidy up
    return
    }
    }(queryCtx, i, x)
    }

    // Start go routine that will send a MultiError to a channel if all queries fail
    mec := make(chan MultiError)
    go func() {
    var me MultiError
    errCount := 0
    for err := range ec {
    me.append(err)
    errCount += 1
    if errCount == noOfQueries {
    mec <- me
    return
    }

    }
    }()

    // Wait for one query to succeed or all queries to fail
    select {
    case me := <-mec:
    for _, cancelFn := range cancelFns { // not strictly required so long as ctx is eventually cancelled
    cancelFn()
    }
    wg.Wait()
    return nil, me.check()
    case result := <-rc:
    for i, cancelFn := range cancelFns { // not strictly required so long as ctx is eventually cancelled
    if i != result.no { // do not cancel the query that returned a result
    cancelFn()
    }
    }
    wg.Wait()
    return result.rows, nil
    }
    }

    关于go - Go 例程后关闭冗余 sql.Rows 对象的推荐方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60342187/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com