gpt4 book ai didi

python-3.x - 如何在多个代理或 faust 计时器之间共享 faust 表?

转载 作者:行者123 更新时间:2023-12-04 13:40:19 24 4
gpt4 key购买 nike

我正在尝试在一段时间后将 faust 表的数据(计数)发布到 kafka 主题。当我发布一些简单的字符串时,计时器正在工作,但它无法以某种方式访问​​表的数据。
下面是定时器的代码:

@app.timer(interval=10.0)
async def publish_to_anomaly_topic():
await anomaly_topic.send(
value=str(page_views['total'].value())
)
@app.agent(page_view_topic)
async def count_page_views(views):
async for view in views.group_by(PageView.id):
total=0
page_views[view.id]+=1
for everykey in list(page_views.keys()):
if everykey != 'total':
total+=page_views[everykey].value()
page_views['total'] = total

代理工作正常。我能够正确地看到这些值。

最佳答案

在我尝试做同样的事情时发现了这个问题,这是我如何弄清楚的。
https://faust.readthedocs.io/en/latest/userguide/tables.html

You cannot modify a table outside of a stream operation; this meansthat you can only mutate the table from within an async for event instream: block. We require this to align the table’s partitions withthe stream’s, and to ensure the source topic partitions are correctlyrebalanced to a different worker upon failure, along with anynecessary table partitions.

Modifying a table outside of a stream will raise an error:


该文档说您无法在流操作之外访问/修改表。
为了解决这个问题,您可以将计时器功能分为两部分:
@app.timer(10)
async def my_timer_function():
# value does not matter as much as the send operation
await my_calling_function.send(value="send data now!")

@app.agent()
async def my_calling_function(stream_from_timer_func):
async for message in stream_from_timer_func:
print(message) # this will print "send data now!"
table_data = my_table['key']
# Here is where you can access your table data and finish sending the
# message to the topic you want
await my_topic.send(value=table_data)
正如您所看到的,如果您使用计时器功能向代理发送消息,您就可以访问您想要的表,它只需要在一个
async for event in stream:
代码块。

关于python-3.x - 如何在多个代理或 faust 计时器之间共享 faust 表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58095966/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com