- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 pytest 为我的 Faust 应用程序编写单元测试。我引用了文档 here但它没有提到当我的 Faust 代理将数据发送到接收器时要做什么。
没有接收器,我的测试工作正常,但是当我使用接收器时,我收到此错误:
RuntimeError: Task <Task pending name='Task-2' coro=<Agent._execute_actor() running at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/faust/agents/agent.py:647> cb=[<TaskWakeupMethWrapper object at 0x7fc28967c5b0>()]> got Future <Future pending> attached to a different loop
INFO faust.agents.agent:logging.py:265 [^-AgentTestWrapper: ml_exporter.processDetections]: Stopping...
app = faust.App('ml-exporter', broker=dx_broker, value_serializer='json')
detection_topic = app.topic(dx_topic)
graph_topic = app.topic(gwh_topic)
@app.agent(detection_topic, sink=[graph_topic])
async def processDetections(detections):
detection_count = 0
async for detection in detections:
detection_count += 1
# r.set("detection_count", detection_count)
yield detection
import ml_exporter
patch('ml_exporter.graph_topic', None)
def create_app():
return faust.App('ml-exporter', value_serializer='json')
@pytest.fixture()
def test_app(event_loop):
app = create_app()
app.finalize()
app.flow_control.resume()
return app
@pytest.mark.asyncio()
async def test_processDetections(test_app):
async with ml_exporter.processDetections.test_context() as agent:
event = await agent.put('hey')
assert agent.results[event.message.offset] == 'hey'
最佳答案
强制 pytest 使用 Faust 的 asyncio 事件循环作为默认的全局循环。将以下 fixture 添加到您的测试代码中:
@pytest.mark.asyncio()
@pytest.fixture()
def event_loop():
yield app.loop
如
pytest documentation 中所述:
The
event_loop
fixture can be easily overridden in any of the standard pytest locations (e.g. directly in the test file, or in conftest.py) to use a non-default event loop. If thepytest.mark.asyncio
marker is applied, a pytest hook will ensure the produced loop is set as the default global loop.
关于python - 如何测试将数据发送到接收器的 Faust 代理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62297894/
从 Faust 文档中我无法找到如何将消费者设置为特定的偏移量。 对于 confluent-kafka,我使用 consumer.offsets_for_times 来查找 start_offset,
我正在尝试在一段时间后将 faust 表的数据(计数)发布到 kafka 主题。当我发布一些简单的字符串时,计时器正在工作,但它无法以某种方式访问表的数据。 下面是定时器的代码: @app.tim
我正在尝试使用 pytest 为我的 Faust 应用程序编写单元测试。我引用了文档 here但它没有提到当我的 Faust 代理将数据发送到接收器时要做什么。 没有接收器,我的测试工作正常,但是当我
在分布式系统和实时数据处理中,流处理是十分重要的技术。在数据密集型应用中,数据快速到达,转瞬即逝,需要及时进行处理,流式处理强调数据和事件的处理速度,对性能和可靠性有较高的要求。 流处理框架包括:
我正在尝试让一个浮士德代理在 flask View /端点内转换消息, 我找不到任何例子,我真的很挣扎。 有没有人成功尝试过这个? 文档说使用 gevent 或 eventlet 作为 asyncio
我很好奇您应该如何表达您希望将消息快速传递到 Kafka 主题。他们自述文件中的示例似乎没有写入主题: import faust class Greeting(faust.Record): f
我目前正在研究一个用例,使用 Kafka 和 robinhood 的 faust 来处理来自 Kafka 的数据。我已经成功地进行了计算,我需要的结果正在打印到我的 faust worker 正在运行
我有一个简单的应用程序,具有两个功能,一个用于收听主题,另一个用于 Web 端点。我想创建服务器端事件流(SSE),即文本/事件流,以便在客户端我可以使用 EventSource 收听它。 我现在有以
我刚刚安装了 Faust 并运行了一个基本程序来通过 Kafka 发送和接收消息。我使用了 (Faust example of publishing to a kafka topic) 中提到的示例代
我是一名优秀的程序员,十分优秀!