- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
本质上,用Google Cloud Datastore计算滚动余额以确定何时该补充用户钱包的最佳方法是什么?
付款与交易
我维护一个付款平台,我们的用户可以在该平台上向第三方代理商付款。不幸的是,我这个行业的本质是这些付款事件不是实时发送给我们的,而是捆绑在一起,在几小时到几周后才发送给我们。
这些是影响用户钱包余额的主要对象:
class Transaction(ndb.model):
user = ndb.KeyProperty(User, required=True)
amount = ndb.FloatProperty(required=True)
# ... other fields
class Payment(ndb.model):
user = ndb.KeyProperty(User, required=True)
amount = ndb.FloatProperty(required=True)
# ... other fields
@classmethod
def charge(cls, user, amount):
# ... make a call to braintree/stripe & save result if successful
class WalletActivity(ndb.Model):
user = ndb.KeyProperty(User, required=True)
post_date = ndb.DateTimeProperty(required=True)
balance_increment = ndb.FloatProperty(required=True)
balance_result = ndb.FloatProperty(required=True)
# the key to the Transaction or Payment object that this is for
object_key = ndb.KeyProperty(required=True)
@classmethod
def create(cls, obj, previous_balance):
return WalletActivity(
user_key=obj.user,
post_date=datetime.datetime.now(),
balance_increment=obj.amount,
balance_result=previous_balance+obj.amount,
object_key=obj.key)
@classmethod
def fetch_last_wallet_activity(cls, user_key):
return cls.query(cls.user == user_key).order(-cls.post_date).get()
WalletActivity.fetch_last_wallet_activity().balance_result
)
class BalanceUpdate(ndb.model):
user = ndb.KeyProperty(User)
cut_off_date = ndb.DateTimeProperty()
balance = ndb.IntegerProperty()
@classmethod
def current_balance(cls, user_key):
last_balance_update = cls.query(cls.user == user_key).order(
-cls.cut_off_date).get()
recent_wallet_activity = WalletActivity.query(cls.user == user_key,
cls.post_date > last_balance_update.cut_off_date).fetch()
return (last_balance_update.balance +
sum([i.balance_increment for i in recent_wallet_activity]))
balance_result
的
WalletActivity
可能更好
def _process_transactions(user, transactions, last_wallet_activity):
transactions_amount = sum([i.amount for i in transactions])
# 2. Replenish their account if the existing balance is low
if last_wallet_activity.balance_result - transactions_amount < user.wallet_bottom_threshold:
payment = Payment.charge(
user=user,
amount=user.wallet_replenish_amount + transactions_amount)
payment.put()
last_wallet_activity = WalletActivity.create(
obj=payment,
previous_balance=last_wallet_activity.balance_result)
last_wallet_activity.put()
# 3. Add the transactions to their wallet
new_objects = []
for transaction in transactions:
last_wallet_activity = WalletActivity.create(
obj=transaction,
previous_balance=last_wallet_activity.balance_result)
new_objects.append(last_wallet_activity)
ndb.put_multi(new_objects)
return new_objects
def process_transactions_1(user, transactions):
# 1. Get the user's balance from the last WalletActivity
last_wallet_activity = WalletActivity.fetch_last_wallet_activity(user_key=user.key)
return _process_transactions(user, transactions, last_wallet_activity)
WalletActivity.fetch_last_wallet_activity().balance_result
和的问题
BalanceUpdate.current_balance()
是数据存储区查询最终是一致的。
WalletActivity
的密钥,因为通过密钥进行的提取是高度一致的:
class LastWalletActivity(ndb.Model):
last_wallet_activity = ndb.KeyProperty(WalletActivity, required=True)
@classmethod
def get_for_user(cls, user_key):
# LastWalletActivity has the same key as the user it is for
return ndb.Key(cls, user_key.id()).get(use_cache=False, use_memcache=False)
def process_transactions_2(user, transactions):
# 1. Get the user's balance from the last WalletActivity
last_wallet_activity = LastWalletActivity.get_for_user(user_key=user.key)
new_objects = _process_transactions(user, transactions, last_wallet_activity.last_wallet_activity)
# update LastWalletActivity
last_wallet_activity.last_wallet_activity = new_objects[-1].key
last_wallet_activity.put()
return new_objects
last_wallet_activity
存储在
User
对象上,但是我不想担心竞争条件
last_wallet_activity
新值
class UserPaymentLock(ndb.Model):
lock_time = ndb.DateTimeProperty(auto_now_add=True)
@classmethod
@ndb.transactional()
def lock_user(cls, user_key):
# UserPaymentLock has the same key as the user it is for
key = ndb.Key(cls, user_key.id())
lock = key.get(use_cache=False, use_memcache=False)
if lock:
# If the lock is older than a minute, still return False, but delete it
# There are situations where the instance can crash and a user may never get unlocked
if datetime.datetime.now() - lock.lock_time > datetime.timedelta(seconds=60):
lock.key.delete()
return False
key.put()
return True
@classmethod
def unlock_user(cls, user_key):
ndb.Key(cls, user_key.id()).delete()
def process_transactions_3(user, transactions):
# Attempt to lock the account, abort & try again if already locked
if not UserPaymentLock.lock_user(user_key=user.key):
raise Exception("Unable to acquire payment lock")
# 1. Get the user's balance from the last WalletActivity
last_wallet_activity = LastWalletActivity.get_for_user(user_key=user.key)
new_objects = _process_transactions(user, transactions, last_wallet_activity.last_wallet_activity)
# update LastWalletActivity
last_wallet_activity.last_wallet_activity = new_objects[-1].key
last_wallet_activity.put()
# unlock the account
UserPaymentLock.unlock_user(user_key=user.key)
return new_objects
最佳答案
有关竞争的设计注意事项
通常,对于您的Dan's answer,我完全同意original question的观点,尽管在您的特定用例中,使用大型实体组可能是合理的。
争用错误可能会以每秒超过1个写入op的速度发生在同一实体组上,例如一个特定的钱包。此限制还适用于从数据存储事务中的数据存储读取的实体,而无需显式地写回它们(我的理解是它们是written together with the modified entities for serializability)。
虽然1秒规则不是强制性的限制,但根据我的经验,Cloud Datastore通常可以处理略高于该限制的短脉冲,但不能保证,因此通常建议并采取最佳做法来避免大型实体组,尤其是在写操作不是来自同一用户。相反,将用户发布的所有评论存储在同一实体组(作者=父级)中可能是安全的,因为用户每秒发布多个评论的可能性很小,甚至是不可取的。另一个示例可能是对时间不敏感且不面向用户的后台任务,其中对实体组的写操作是按每个实体组编排的,或者至少在发生争用时可以大大减少写操作。
在突然以非常高的速率添加新实体的情况下,争用错误也可能是由单调增加的键/ ID或索引属性和复合索引(索引值彼此之间过于接近)(例如时间戳)引起的。建议让数据存储区自动创建新实体的ID(例如用户ID),因为数据存储区会充分分散密钥。并且要么避免在可能出现单调递增值的位置建立索引属性,要么在值前面加上哈希值。
Cloud Datastore文章Best Practices包含有关Designing for scale的部分,该部分提供了非常有用的建议。
就是说,与编写试图模仿事务方面的应用程序逻辑相比,设计一种应用程序以使其在写限制内安全地工作并依靠数据存储区(或其他数据库)的事务性和强大的一致性支持可能更容易。竞争状况,死锁等等可能导致错误,并使系统更脆弱,更容易出错。
(A)小团体的钱包活动
在最初的问题中,您提到公司帐户和付款,这建议了一些实时付款解决方案。方案:该公司中的成千上万个用户可以为一个帐户提交新交易,但很多人可能同时提交新交易,因此风险很高。如果每笔交易都属于同一实体组(公司帐户),则很容易导致争用错误。如果在写操作中执行重试,则这些操作会导致延迟延长,直到用户得到对其事务请求的响应为止。但是即使重试,写操作也很可能经常失败,并且用户经常会遇到服务器错误。这将带来糟糕的用户体验。
我倾向于选择(2),但使用Wallet
类型。您对在何处存储last_wallet_activity
表示担忧。可能有自己的Wallet
类型,该ID始终与User
相同。在这种情况下,您可以有两个单独的实体组,而不关心用户触发的User
对象上的中间更改。我还将使用数据存储事务。在同一笔交易中最多可以允许25个不同的实体组,即一批中可能有23个事件。使用当前的设计,这也将是每个钱包的最大写入率。不确定这是您的应用可接受的限制。
但是,如果某个钱包确实有很多交易(通过频繁更新last_wallet_activity
),您可能会再次面临争用的风险。为了获得每个钱包更高的写入率并避免这种争用,您还可以将选项(1)与某种sharding结合使用。
选项(3)尝试实现事务方面(请参见前面)。选项(1)确实遭受这样的事实,即查询仅最终是一致的。
(B)单个大型实体组中的钱包活动
但是,这里的问题提到所有这些钱包活动都是在后台处理的(不是通过用户请求),并且事件不是实时处理的,而是要延迟几个小时或几周。这将允许通过后台任务分批处理它们。假设您的应用将位于其他Cloud Datastore limits之内,例如如果交易的最大大小为10 MiB,则您的应用可以将每个公司钱包(实体组)每秒最多进行500次钱包活动批量写入单个写入操作。这相当于每个公司帐户每小时180万个钱包活动。即使发生重试和中断,这对于最大的公司帐户也足够了吗?如果是,并且您的产品从不更改为面向用户的实时钱包活动,那么我不明白为什么不应该将钱包活动放入每个钱包的实体组中。当然,另一种方法可能是每个公司帐户只有多个钱包。
在这种情况下,您的选项(1)应该可以使用,因为祖先查询(WalletActivity
查询中的钱包是祖先)是高度一致的。
使用ndb.put_multi()
我已经看到您在同一请求中使用了多个put()
调用,您可以在其中完美地将实体收集在某个toPut
列表中,然后将它们全部一起编写。这样,您可以节省实例运行时,还可以减少对同一实体组的写操作次数。
如何避免重复的付款请求
关于向Braintree或Stripe的付款请求:
在将钱包活动添加到将写入数据存储的下一批之前,请检查钱包的余额是否足够。
如果余额不足,请停止添加更多活动。
在将批处理写入数据存储区的数据存储区事务内,添加transactional push task(如果事务失败,则不会创建)。我相信,GAE / NDB每个HTTP请求最多接受5个事务任务。
该任务将负责将请求发送到Braintree / Stripe,并更新钱包的余额。并且这也应该在数据存储事务中,并具有事务任务以继续处理它之前离开的事件。
您需要处理Braintree / Stripe拒绝付款请求的情况。
我也不知道您的事件如何到达应用程序,因此我不确定安排每个钱包的批处理任务的最佳方法。上面的模式表明,对于每个钱包来说,只有一个任务正在运行(即,对于同一个钱包,没有并行的多个任务/批次)。但是,您可以通过不同的方式执行此操作,具体取决于事件到达应用程序的方式。
关于python - 用户的Google NDB数据存储支票帐户/电子钱包。如何计算余额,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48738119/
我正在运行一个辅助角色,并检查 Azure 上托管的存储中是否存在数据。当我将连接字符串用于经典类型的存储时,我的代码可以正常工作,但是当我连接到 V2 Azure 存储时,它会抛出此异常。 “远程服
在我的应用程序的主页上,我正在进行 AJAX 调用以获取应用程序各个部分所需的大量数据。该调用如下所示: var url = "/Taxonomy/GetTaxonomyList/" $.getJSO
大家好,我正在尝试将我的商店导入我的 Vuex Route-Gard。 路由器/auth-guard.js import {store} from '../store' export default
我正在使用 C# 控制台应用程序 (.NET Core 3.1) 从 Azure Blob 存储读取大量图像文件并生成这些图像的缩略图。新图像将保存回 Azure,并将 Blob ID 存储在我们的数
我想将 Mlflow 设置为具有以下组件: 后端存储(本地):在本地使用 SQLite 数据库存储 Mlflow 实体(run_id、params、metrics...) 工件存储(远程):使用 Az
我正在使用 C# 控制台应用程序 (.NET Core 3.1) 从 Azure Blob 存储读取大量图像文件并生成这些图像的缩略图。新图像将保存回 Azure,并将 Blob ID 存储在我们的数
我想将 Mlflow 设置为具有以下组件: 后端存储(本地):在本地使用 SQLite 数据库存储 Mlflow 实体(run_id、params、metrics...) 工件存储(远程):使用 Az
我的 Windows 计算机上的本地文件夹中有一些图像。我想将所有图像上传到同一容器中的同一 blob。 我知道如何使用 Azure Storage SDKs 上传单个文件BlockBlobServi
我尝试发出 GET 请求来获取我的 Azure Blob 存储帐户的帐户详细信息,但每次都显示身份验证失败。谁能判断形成的 header 或签名字符串是否正确或是否存在其他问题? 代码如下: cons
这是用于编写 JSON 的 NeutralinoJS 存储 API。是否可以更新 JSON 文件(推送数据),而不仅仅是用新的 JS 对象覆盖数据。怎么做到的??? // Javascript
我有一个并行阶段设置,想知道是否可以在嵌套阶段之前运行脚本,所以像这样: stage('E2E-PR-CYPRESS') { when { allOf {
我想从命令行而不是从GUI列出VirtualBox VM的详细信息。我对存储细节特别感兴趣。 当我在GUI中单击VM时,可以看到包括存储部分在内的详细信息: 但是到目前为止,我还没有找到通过命令行执行
我有大约 3500 个防洪设施,我想将它们表示为一个网络来确定流动路径(本质上是一个有向图)。我目前正在使用 SqlServer 和 CTE 来递归检查所有节点及其上游组件,只要上游路径没有 fork
谁能告诉我 jquery data() 在哪里存储数据以及何时删除以及如何删除? 如果我用它来存储ajax调用结果,会有性能问题吗? 例如: $("body").data("test", { myDa
有人可以建议如何为 Firebase 存储中的文件设置备份。我能够备份数据库,但不确定如何为 firebase 存储中的文件(我有图像)设置定期备份。 最佳答案 如何进行 Firebase 存储的本地
我最近开始使用 firebase 存储和 firebase 功能。现在我一直在开发从功能到存储的文件上传。 我已经让它工作了(上传完成并且文件出现在存储部分),但是,图像永远保持这样(永远在右侧加载)
我想只允许用户将文件上传到他们自己的存储桶中,最大文件大小为 1MB,仍然允许他们删除文件。我添加了以下内容: match /myusers/{userId}/{allPaths=**} { al
使用生命周期管理策略将容器的内容从冷访问层移动到存档。我正在尝试以下策略,希望它能在一天后将该容器中的所有文件移动到存档层,但事实并非如此在职的。我设置了选择标准“一天未使用后”。 这是 json 代
对于连接到 Azure 存储端点,有 http 和 https 两个选项。 第一。 https 会带来开销,可能是 5%-10%,但我不支付同一个数据中心的费用。 第二。 http 更快,但 Auth
有人可以帮我理解这一点吗?我创建了Virtual Machine in Azure running Windows Server 2012 。我注意到 Azure 自动创建了一个存储帐户。当我进入该存
我是一名优秀的程序员,十分优秀!