- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个简单的应用程序,具有两个功能,一个用于收听主题,另一个用于 Web 端点。我想创建服务器端事件流(SSE),即文本/事件流,以便在客户端我可以使用 EventSource 收听它。
我现在有以下代码,每个函数都在做它的特定工作:
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
@app.agent(test_topic)
async def test_topic_agent(stream):
async for value in stream:
print(f"test_topic_agent RECEIVED -- {value!r}")
yield value
@app.page("/")
async def index(self, request):
return self.text("yey")
import asyncio
from aiohttp import web
from aiohttp.web import Response
from aiohttp_sse import sse_response
from datetime import datetime
async def hello(request):
loop = request.app.loop
async with sse_response(request) as resp:
while True:
data = 'Server Time : {}'.format(datetime.now())
print(data)
await resp.send(data)
await asyncio.sleep(1, loop=loop)
return resp
async def index(request):
d = """
<html>
<body>
<script>
var evtSource = new EventSource("/hello");
evtSource.onmessage = function(e) {
document.getElementById('response').innerText = e.data
}
</script>
<h1>Response from server:</h1>
<div id="response"></div>
</body>
</html>
"""
return Response(text=d, content_type='text/html')
app = web.Application()
app.router.add_route('GET', '/hello', hello)
app.router.add_route('GET', '/', index)
web.run_app(app, host='127.0.0.1', port=8080)
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
# @app.agent(test_topic)
# async def test_topic_agent(stream):
# async for value in stream:
# print(f"test_topic_agent RECEIVED -- {value!r}")
# yield value
@app.page("/", name="t1")
@app.agent(test_topic, name="t")
async def index(self, request):
return self.text("yey")
Traceback (most recent call last):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 299, in find_app
val = symbol_by_name(app, imp=imp)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 262, in symbol_by_name
module = imp( # type: ignore
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 376, in import_from_cwd
return imp(module, package=package)
File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/Users/maverick/company/demo1/baiohttp-demo/app1.py", line 18, in <module>
async def index(self, request):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/app/base.py", line 1231, in _decorator
view = view_base.from_handler(cast(ViewHandlerFun, fun))
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/web/views.py", line 50, in from_handler
return type(fun.__name__, (cls,), {
AttributeError: 'Agent' object has no attribute '__name__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/maverick/.pyenv/versions/faust_demo/bin/faust", line 8, in <module>
sys.exit(cli())
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/click/core.py", line 781, in main
with self.make_context(prog_name, args, **extra) as ctx:
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 407, in make_context
self._maybe_import_app()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 372, in _maybe_import_app
find_app(appstr)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 303, in find_app
val = imp(app)
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 376, in import_from_cwd
return imp(module, package=package)
File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/Users/maverick/company/demo1/baiohttp-demo/app1.py", line 18, in <module>
async def index(self, request):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/app/base.py", line 1231, in _decorator
view = view_base.from_handler(cast(ViewHandlerFun, fun))
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/web/views.py", line 50, in from_handler
return type(fun.__name__, (cls,), {
AttributeError: 'Agent' object has no attribute '__name__'
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
# @app.agent(test_topic)
# async def test_topic_agent(stream):
# async for value in stream:
# print(f"test_topic_agent RECEIVED -- {value!r}")
# yield value
@app.agent(test_topic, name="t")
@app.page("/", name="t1")
async def index(self, request):
return self.text("yey")
[2020-03-28 10:32:50,676] [29976] [INFO] [^--Producer]: Creating topic 'app1-__assignor-__leader'
[2020-03-28 10:32:50,695] [29976] [INFO] [^--ReplyConsumer]: Starting...
[2020-03-28 10:32:50,695] [29976] [INFO] [^--AgentManager]: Starting...
[2020-03-28 10:32:50,695] [29976] [INFO] [^---Agent: app1.index]: Starting...
[2020-03-28 10:32:50,696] [29976] [ERROR] [^Worker]: Error: TypeError("__init__() missing 1 required positional argument: 'web'")
Traceback (most recent call last):
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/worker.py", line 273, in execute_from_commandline
self.loop.run_until_complete(self._starting_fut)
File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py", line 612, in run_until_complete
return future.result()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
await child.maybe_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
await child.maybe_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
await self.on_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/manager.py", line 58, in on_start
await agent.maybe_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
await self.start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
await self._default_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
await self._actually_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
await self.on_start()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 282, in on_start
await self._on_start_supervisor()
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 312, in _on_start_supervisor
res = await self._start_one(
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 251, in _start_one
return await self._start_task(
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 617, in _start_task
actor = self(
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 525, in __call__
return self.actor_from_stream(stream,
File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 552, in actor_from_stream
res = self.fun(actual_stream)
TypeError: __init__() missing 1 required positional argument: 'web'
[2020-03-28 10:32:50,703] [29976] [INFO] [^Worker]: Stopping...
[2020-03-28 10:32:50,703] [29976] [INFO] [^-App]: Stopping...
[2020-03-28 10:32:50,703] [29976] [INFO] [^-App]: Flush producer buffer...
[2020-03-28 10:32:50,703] [29976] [INFO] [^--TableManager]: Stopping...
最佳答案
Faust worker 还将在每个实例上公开一个 Web 服务器,默认情况下在端口 6066 上运行。
服务器将使用 aiohttp HTTP 服务器库,您可以利用这个东西并创建一个服务器端事件流 (SSE),就像在您的示例代码中一样。
您可以创建一个从 Kafka 主题 test
读取的代理。并将更新变量 last_message_from_topic
对于来自主题的最后一条消息,此变量也将在您的网页中可见。
在索引页面 (@app.page('/')
) 中,EventSource 接口(interface)用于接收服务器发送的事件。它通过 HTTP 连接到服务器并从页面 /hello
接收文本/事件流格式的事件不关闭连接。
网页/hello
每秒发送一条消息文本,其中包含来自 Kafka 主题 test
的最后一条消息并使用服务器的当前时间。
这是我的文件my_worker.py
代码:
import asyncio
from datetime import datetime
import faust
from aiohttp.web import Response
from aiohttp_sse import sse_response
app = faust.App(
"app1",
broker='kafka://localhost:9092',
value_serializer='json',
)
test_topic = app.topic("test")
last_message_from_topic = ['No messages yet']
@app.agent(test_topic)
async def greet(greetings):
async for greeting in greetings:
last_message_from_topic[0] = greeting
@app.page('/hello')
async def hello(self, request):
loop = request.app.loop
async with sse_response(request) as resp:
while True:
data = f'last message from topic_test: {last_message_from_topic[0]} | '
data += f'Server Time : {datetime.now()}'
print(data)
await resp.send(data)
await asyncio.sleep(1, loop=loop)
return resp
@app.page('/')
async def index(self, request):
d = """
<html>
<body>
<script>
var evtSource = new EventSource("/hello");
evtSource.onmessage = function(e) {
document.getElementById('response').innerText = e.data
}
</script>
<h1>Response from server:</h1>
<div id="response"></div>
</body>
</html>
"""
return Response(text=d, content_type='text/html')
faust -A my_worker worker -l info
http://localhost:6066/
:
test
向 Kafka 发送消息的代码(来自另一个python文件):
import time
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for i in range(220):
time.sleep(1)
producer.send('test', value=f'Some message from kafka id {i}')
关于python - 如何使用 Faust Python 包将 kafka 主题与 Web 端点连接起来?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60899445/
从 Faust 文档中我无法找到如何将消费者设置为特定的偏移量。 对于 confluent-kafka,我使用 consumer.offsets_for_times 来查找 start_offset,
我正在尝试在一段时间后将 faust 表的数据(计数)发布到 kafka 主题。当我发布一些简单的字符串时,计时器正在工作,但它无法以某种方式访问表的数据。 下面是定时器的代码: @app.tim
我正在尝试使用 pytest 为我的 Faust 应用程序编写单元测试。我引用了文档 here但它没有提到当我的 Faust 代理将数据发送到接收器时要做什么。 没有接收器,我的测试工作正常,但是当我
在分布式系统和实时数据处理中,流处理是十分重要的技术。在数据密集型应用中,数据快速到达,转瞬即逝,需要及时进行处理,流式处理强调数据和事件的处理速度,对性能和可靠性有较高的要求。 流处理框架包括:
我正在尝试让一个浮士德代理在 flask View /端点内转换消息, 我找不到任何例子,我真的很挣扎。 有没有人成功尝试过这个? 文档说使用 gevent 或 eventlet 作为 asyncio
我很好奇您应该如何表达您希望将消息快速传递到 Kafka 主题。他们自述文件中的示例似乎没有写入主题: import faust class Greeting(faust.Record): f
我目前正在研究一个用例,使用 Kafka 和 robinhood 的 faust 来处理来自 Kafka 的数据。我已经成功地进行了计算,我需要的结果正在打印到我的 faust worker 正在运行
我有一个简单的应用程序,具有两个功能,一个用于收听主题,另一个用于 Web 端点。我想创建服务器端事件流(SSE),即文本/事件流,以便在客户端我可以使用 EventSource 收听它。 我现在有以
我刚刚安装了 Faust 并运行了一个基本程序来通过 Kafka 发送和接收消息。我使用了 (Faust example of publishing to a kafka topic) 中提到的示例代
我是一名优秀的程序员,十分优秀!