我正在使用 faust 并想利用并发功能。列出的示例并未完全演示并发的使用。
我想做的是,从 kafka producer 读取并取消嵌套 json。然后将 cargo 发送到计算账单等的流程。我应该一次将 10 批 cargo 发送到执行计算的函数。为此,我使用并发,因此可以同时计算 10 批 cargo 。
import faust
import time
import json
from typing import List
import asyncio
class Items(faust.Record):
name: str
billing_unit: str
billing_qty: int
class Shipments(faust.Record, serializer="json"):
shipments: List[Items]
ship_type: str
shipping_service: str
shipped_at: str
app = faust.App('ships_app', broker='kafka://localhost:9092', )
ship_topic = app.topic('test_shipments', value_type=Shipments)
@app.agent(value_type=str, concurrency=10)
async def mytask(records):
# task that does some other activity
async for record in records:
print(f'received....{record}')
time.sleep(5)
@app.agent(ship_topic)
async def process_shipments(shipments):
# async for ships in stream.take(100, within=10):
async for ships in shipments:
data = ships.items
uid = faust.uuid()
for item in data:
item_uuid = faust.uuid()
print(f'{uid}, {item_uuid}, {ships.ship_type}, {ships.shipping_service}, {ships.shipped_at}, {item.name}, {item.billing_unit}, {item.billing_qty}')
await mytask.send(value=("{} -- {}".format(uid, item_uuid)))
# time.sleep(2)
# time.sleep(10)
if __name__ == '__main__':
app.main()
好的,我知道它是如何工作的。您给出的示例的问题实际上是 time.sleep 位,而不是并发位。下面是两个愚蠢的例子,展示了代理如何在并发和不并发的情况下工作。
import faust
import asyncio
app = faust.App(
'example_app',
broker="kafka://localhost:9092",
value_serializer='raw',
)
t = app.topic('topic_1')
# @app.agent(t, concurrency=1)
# async def my_task(tasks):
# async for my_task in tasks:
# val = my_task.decode('utf-8')
# if (val == "Meher"):
# # This will print out second because there is only one thread.
# # It'll take 5ish seconds and print out right after Waldo
# print("Meher's a jerk.")
# else:
# await asyncio.sleep(5)
# # Since there's only one thread running this will effectively
# # block the agent.
# print(f"Where did {val} go?")
@app.agent(t, concurrency=2)
async def my_task2(tasks):
async for my_task in tasks:
val = my_task.decode('utf-8')
if (val == "Meher"):
# This will print out first even though the Meher message is
# received second.
print("Meher's a jerk.")
else:
await asyncio.sleep(5)
# Because this will be sleeping and there are two threads available.
print(f"Where did {val} go?")
# ===============================
# In another process run
from kafka import KafkaProducer
p = KafkaProducer()
p.send('topic_1', b'Waldo'); p.send('topic_1', b'Meher')
我是一名优秀的程序员,十分优秀!