gpt4 book ai didi

python - SQLAlchemy 计算列

转载 作者:IT老高 更新时间:2023-10-28 22:01:31 32 4
gpt4 key购买 nike

(新 SQLAlchemy 用户警报)我有三个表:一个人、从特定日期开始的人员每小时费率和每日时间报告。我正在寻找正确的方法来计算当天人员小时费率的时基成本。

是的,我可以在创建时计算值并将其作为模型的一部分,但可以将此视为在幕后汇总更复杂数据的示例。如何计算 Time.cost?它是混合属性、列属性还是完全不同的东西?

class Person(Base):
__tablename__ = 'person'
personID = Column(Integer, primary_key=True)
name = Column(String(30), unique=True)

class Payrate(Base):
__tablename__ = 'payrate'
payrateID = Column(Integer, primary_key=True)
personID = Column(Integer, ForeignKey('person.personID'))
hourly = Column(Integer)
starting = Column(Date)
__tableargs__ =(UniqueConstraint('personID', 'starting',
name='uc_peron_starting'))

class Time(Base):
__tablename__ = 'entry'
entryID = Column(Integer, primary_key=True)
personID = Column(Integer, ForeignKey('person.personID'))
workedon = Column(Date)
hours = Column(Integer)

person = relationship("Person")

def __repr__(self):
return "<{date} {hours}hrs ${0.cost:.02f}>".format(self,
date=self.workedon.isoformat(), hours=to_hours(self.hours))

@property
def cost(self):
'''Cost of entry
'''
## This is where I am stuck in propery query creation
return self.hours * query(Payrate).filter(
and_(Payrate.personID==personID,
Payrate.starting<=workedon
).order_by(
Payrate.starting.desc())

最佳答案

您在这里遇到的问题,为了尽可能优雅地解决,使用了 非常 高级 SQLAlchemy 技术,所以我知道您是初学者,但这个答案将向您展示所有出路到最后。然而,解决这样的问题需要一步一步来,随着我们的进行,您可以通过不同的方式得到您想要的答案。

在了解如何混合这个或其他内容之前,您需要考虑一下 SQL。我们如何在任意一系列行上查询 Time.cost?我们可以干净地将 Time 链接到 Person,因为我们有一个简单的外键。但是将 Time 与 Payrate 链接到这个特定的模式是很棘手的,因为 Time 不仅通过 person_id 还通过 workon 链接到 Payrate - 在 SQL 中,我们最容易使用 "time.person_id = person.id AND time.在 payrate.start_date 和 payrate.end_date 之间工作”。但是这里没有“end_date”,这意味着我们也必须推导出它。这个推导是最棘手的部分,所以我想出的是这样开始的(我已经小写了你的列名):

SELECT payrate.person_id, payrate.hourly, payrate.starting, ending.ending
FROM payrate LEFT OUTER JOIN
(SELECT pa1.payrate_id, MIN(pa2.starting) as ending FROM payrate AS pa1
JOIN payrate AS pa2 ON pa1.person_id = pa2.person_id AND pa2.starting > pa1.starting
GROUP BY pa1.payrate_id
) AS ending ON payrate.payrate_id=ending.payrate_id

可能有其他方法可以得到这个,但这就是我想出的 - 其他方法几乎肯定会发生一些类似的事情(即子查询、连接)。

因此,通过支付率开始/结束,我们可以弄清楚查询的样子。我们想使用 BETWEEN 将时间条目与日期范围相匹配,但最新的 payrate 条目的“结束”日期将为 NULL,因此解决此问题的一种方法是对非常高的日期使用 COALESCE(另一种是使用条件):

SELECT *, entry.hours * payrate_derived.hourly
FROM entry
JOIN
(SELECT payrate.person_id, payrate.hourly, payrate.starting, ending.ending
FROM payrate LEFT OUTER JOIN
(SELECT pa1.payrate_id, MIN(pa2.starting) as ending FROM payrate AS pa1
JOIN payrate AS pa2 ON pa1.person_id = pa2.person_id AND pa2.starting > pa1.starting
GROUP BY pa1.payrate_id
) AS ending ON payrate.payrate_id=ending.payrate_id) as payrate_derived
ON entry.workedon BETWEEN payrate_derived.starting AND COALESCE(payrate_derived.ending, "9999-12-31")
AND entry.person_id=payrate_derived.person_id
ORDER BY entry.person_id, entry.workedon

现在,@hybrid 在 SQLAlchemy 中可以为您做的事情,当在 SQL 表达式级别运行时,正是“entry.hours * payrate_derived.hourly”部分,就是这样。那里的所有 JOIN 等,您需要在外部提供给混合动力车。

所以我们需要把那个大子查询加入到这个:

class Time(...):
@hybrid_property
def cost(self):
# ....

@cost.expression
def cost(cls):
return cls.hours * <SOMETHING>.hourly

那么让我们弄清楚 <SOMETHING>是。将该 SELECT 构建为一个对象:

from sqlalchemy.orm import aliased, join, outerjoin
from sqlalchemy import and_, func

pa1 = aliased(Payrate)
pa2 = aliased(Payrate)
ending = select([pa1.payrate_id, func.min(pa2.starting).label('ending')]).\
select_from(join(pa1, pa2, and_(pa1.person_id == pa2.person_id, pa2.starting > pa1.starting))).\
group_by(pa1.payrate_id).alias()

payrate_derived = select([Payrate.person_id, Payrate.hourly, Payrate.starting, ending.c.ending]).\
select_from(outerjoin(Payrate, ending, Payrate.payrate_id == ending.c.payrate_id)).alias()

cost()在表达式方面,hybrid 需要引用 payrate_derived(我们将在一分钟内完成 python 方面):

class Time(...):
@hybrid_property
def cost(self):
# ....

@cost.expression
def cost(cls):
return cls.hours * payrate_derived.c.hourly

那么为了使用我们的cost()混合,它必须在具有该连接的查询的上下文中。注意这里我们使用 Python 的 datetime.date.max获得最大日期(方便!):

print session.query(Person.name, Time.workedon, Time.hours, Time.cost).\
select_from(Time).\
join(Time.person).\
join(payrate_derived,
and_(
payrate_derived.c.person_id == Time.person_id,
Time.workedon.between(
payrate_derived.c.starting,
func.coalesce(
payrate_derived.c.ending,
datetime.date.max
)
)
)
).\
all()

所以这个连接很大而且很笨拙,我们需要经常这样做,更不用说当我们在 Python 中进行混合时,我们需要在 Python 中加载相同的集合。我们可以使用 relationship() 映射到它,这意味着我们必须设置自定义连接条件,但我们还需要实际映射到该子查询,使用一种鲜为人知的技术,称为非主映射器。非主映射器为您提供了一种将类映射到任意表或 SELECT 构造的方法,仅用于选择行。我们通常不需要使用它,因为 Query 已经允许我们查询任意列和子查询,但是要从 relationship() 中获取它。它需要一个映射。映射需要定义一个主键,关系也需要知道关系的哪一边是“外来的”。这是这里最先进的部分,在这种情况下,它的工作原理如下:

from sqlalchemy.orm import mapper, relationship, foreign

payrate_derived_mapping = mapper(Payrate, payrate_derived, non_primary=True,
primary_key=[
payrate_derived.c.person_id,
payrate_derived.c.starting
])
Time.payrate = relationship(
payrate_derived_mapping,
viewonly=True,
uselist=False,
primaryjoin=and_(
payrate_derived.c.person_id == foreign(Time.person_id),
Time.workedon.between(
payrate_derived.c.starting,
func.coalesce(
payrate_derived.c.ending,
datetime.date.max
)
)
)
)

所以这是我们必须看到的最后一次加入。我们现在可以更早地进行查询:

print session.query(Person.name, Time.workedon, Time.hours, Time.cost).\
select_from(Time).\
join(Time.person).\
join(Time.payrate).\
all()

最后我们可以连接我们的新 payrate关系到 Python 级别的混合:

class Time(Base):
# ...

@hybrid_property
def cost(self):
return self.hours * self.payrate.hourly

@cost.expression
def cost(cls):
return cls.hours * payrate_derived.c.hourly

我们在这里的解决方案花费了很多精力,但至少最复杂的部分,即工资率映射,完全只在一个地方,我们不需要再看一遍。

这是一个完整的工作示例:

from sqlalchemy import create_engine, Column, Integer, ForeignKey, Date, \
UniqueConstraint, select, func, and_, String
from sqlalchemy.orm import join, outerjoin, relationship, Session, \
aliased, mapper, foreign
from sqlalchemy.ext.declarative import declarative_base
import datetime
from sqlalchemy.ext.hybrid import hybrid_property


Base = declarative_base()

class Person(Base):
__tablename__ = 'person'
person_id = Column(Integer, primary_key=True)
name = Column(String(30), unique=True)

class Payrate(Base):
__tablename__ = 'payrate'
payrate_id = Column(Integer, primary_key=True)
person_id = Column(Integer, ForeignKey('person.person_id'))
hourly = Column(Integer)
starting = Column(Date)

person = relationship("Person")
__tableargs__ =(UniqueConstraint('person_id', 'starting',
name='uc_peron_starting'))

class Time(Base):
__tablename__ = 'entry'
entry_id = Column(Integer, primary_key=True)
person_id = Column(Integer, ForeignKey('person.person_id'))
workedon = Column(Date)
hours = Column(Integer)

person = relationship("Person")

@hybrid_property
def cost(self):
return self.hours * self.payrate.hourly

@cost.expression
def cost(cls):
return cls.hours * payrate_derived.c.hourly

pa1 = aliased(Payrate)
pa2 = aliased(Payrate)
ending = select([pa1.payrate_id, func.min(pa2.starting).label('ending')]).\
select_from(join(pa1, pa2, and_(
pa1.person_id == pa2.person_id,
pa2.starting > pa1.starting))).\
group_by(pa1.payrate_id).alias()

payrate_derived = select([Payrate.person_id, Payrate.hourly, Payrate.starting, ending.c.ending]).\
select_from(outerjoin(Payrate, ending, Payrate.payrate_id == ending.c.payrate_id)).alias()

payrate_derived_mapping = mapper(Payrate, payrate_derived, non_primary=True,
primary_key=[
payrate_derived.c.person_id,
payrate_derived.c.starting
])
Time.payrate = relationship(
payrate_derived_mapping,
viewonly=True,
uselist=False,
primaryjoin=and_(
payrate_derived.c.person_id == foreign(Time.person_id),
Time.workedon.between(
payrate_derived.c.starting,
func.coalesce(
payrate_derived.c.ending,
datetime.date.max
)
)
)
)



e = create_engine("postgresql://scott:tiger@localhost/test", echo=False)
Base.metadata.drop_all(e)
Base.metadata.create_all(e)

session = Session(e)
p1 = Person(name='p1')
session.add(p1)

session.add_all([
Payrate(hourly=10, starting=datetime.date(2013, 5, 17), person=p1),
Payrate(hourly=15, starting=datetime.date(2013, 5, 25), person=p1),
Payrate(hourly=20, starting=datetime.date(2013, 6, 10), person=p1),
])

session.add_all([
Time(person=p1, workedon=datetime.date(2013, 5, 19), hours=10),
Time(person=p1, workedon=datetime.date(2013, 5, 27), hours=5),
Time(person=p1, workedon=datetime.date(2013, 5, 30), hours=5),
Time(person=p1, workedon=datetime.date(2013, 6, 18), hours=12),
])
session.commit()

print session.query(Person.name, Time.workedon, Time.hours, Time.cost).\
select_from(Time).\
join(Time.person).\
join(Time.payrate).\
all()

for time in session.query(Time):
print time.person.name, time.workedon, time.hours, time.payrate.hourly, time.cost

输出(第一行是聚合版本,其余是每个对象):

[(u'p1', datetime.date(2013, 5, 19), 10, 100), (u'p1', datetime.date(2013, 5, 27), 5, 75), (u'p1', datetime.date(2013, 5, 30), 5, 75), (u'p1', datetime.date(2013, 6, 18), 12, 240)]
p1 2013-05-19 10 10 100
p1 2013-05-27 5 15 75
p1 2013-05-30 5 15 75
p1 2013-06-18 12 20 240

关于python - SQLAlchemy 计算列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17326290/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com