gpt4 book ai didi

mysql - 尝试用 MySQL 和进程理解 Django select_for_update()

转载 作者:行者123 更新时间:2023-12-05 04:55:57 28 4
gpt4 key购买 nike

我有一个由 MySQL 数据库支持的 Django 应用程序。我最近将一段代码从请求流中移出并放入了一个流程中。该代码使用 select_for_update() 锁定数据库中受影响的行,但现在我偶尔会看到进程更新记录,而它应该在主线程中锁定。如果我将我的执行器从 ProcessPoolExecutor 切换到 ThreadPoolExecutor,锁定会按预期工作。我认为 select_for_update() 在数据库级别运行,因此无论代码是在线程、进程中,还是在另一台机器上,都不应该有任何区别 - 我错过了什么?

我已将我的代码归结为一个表现出相同行为的示例:

from concurrent import futures
import logging
from time import sleep
from django.db import transaction
from myapp.main.models import CompoundBase

logger = logging.getLogger()
executor = futures.ProcessPoolExecutor()
# executor = futures.ThreadPoolExecutor()


def test() -> None:
pk = setup()

f1 = executor.submit(select_and_sleep, pk)
f2 = executor.submit(sleep_and_update, pk)

futures.wait([f1, f2])

def setup() -> int:
cb = CompoundBase.objects.first()
cb.corporate_id = 'foo'
cb.save()

return cb.pk

def select_and_sleep(pk: int) -> None:
try:
with transaction.atomic():
cb = CompoundBase.objects.select_for_update().get(pk=pk)
print('Locking')
sleep(5)
cb.corporate_id = 'baz'
cb.save()
print('Updated after sleep')
except Exception:
logger.exception('select_and_sleep')

def sleep_and_update(pk: int) -> None:
try:
sleep(2)
print('Updating')
with transaction.atomic():
cb = CompoundBase.objects.select_for_update().get(pk=pk)
cb.corporate_id = 'bar'
cb.save()
print('Updated without sleep')
except Exception:
logger.exception('sleep_and_update')

test()

当如图所示运行时,我得到:

Locking
Updating
Updated without sleep
Updated after sleep

但如果我更改为 ThreadPoolExecutor,我会得到:

Locking
Updating
Updated after sleep
Updated without sleep

最佳答案

好消息是它大部分都在那里,我四处阅读并根据我找到的答案 here

我假设您在 Linux 上运行,因为这似乎是平台上的行为。

看起来在 Linux 下默认的进程启动策略是 fork 策略,这通常是您想要的,但是在这种确切的情况下,资源(例如数据库连接)似乎正在共享,导致数据库操作被视为同一事务,因此不会被阻止。为了获得您想要的行为,每个进程似乎都需要自己的资源并且不与其父进程(以及随后的父进程的任何其他子进程)共享资源。

使用以下代码可以获得您想要的行为,但请注意,我不得不将代码拆分为两个文件。

fn.py

from time import sleep

from django.db import transaction
import django

django.setup()

from myapp.main.models import CompoundBase


def setup() -> int:
cb = CompoundBase.objects.first()
cb.corporate_id = 'foo'
cb.save()

return cb.pk

def select_and_sleep(pk: int) -> None:
try:
with transaction.atomic():
cb = CompoundBase.objects.select_for_update().get(pk=pk)
print('Locking')
sleep(5)
cb.corporate_id = 'baz'
cb.save()
print('Updated after sleep')

except Exception:
logger.exception('select_and_sleep')

def sleep_and_update(pk: int) -> None:
try:
sleep(2)
print('Updating')

with transaction.atomic():
cb = CompoundBase.objects.select_for_update().get(pk=pk)
cb.corporate_id = 'bar'
cb.save()
print('Updated without sleep')

except Exception:
logger.exception('sleep_and_update')

proc_test.py

from concurrent import futures
from multiprocessing import get_context
from time import sleep
import logging

import fn

logger = logging.getLogger()
executor = futures.ProcessPoolExecutor(mp_context=get_context("forkserver"))
# executor = futures.ThreadPoolExecutor()


def test() -> None:
pk = fn.setup()

f1 = executor.submit(fn.select_and_sleep, pk)
f2 = executor.submit(fn.sleep_and_update, pk)

futures.wait([f1, f2])

test()

有三种启动进程的策略,forkspawnforkserver,使用spawnforkserver 似乎可以让您获得所需的行为。

引用资料:

关于mysql - 尝试用 MySQL 和进程理解 Django select_for_update(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65110287/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com