- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Faust - 简洁高效的 Python 流处理库由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
在分布式系统和实时数据处理中,流处理是十分重要的技术。在数据密集型应用中,数据快速到达,转瞬即逝,需要及时进行处理,流式处理强调数据和事件的处理速度,对性能和可靠性有较高的要求.
流处理框架包括:Storm,Spark Streaming 和 Flink 等,而 Kafka 也不甘示弱,推出了分布式流处理平台 Kafka Streams。 Faust 把 Kafka Streams 带到了 Python,并实现了抽象和优化,为数据和事件的流处理提供了一个高效便利的框架.
简介 。
Faust,是 robinhood 在 Github 上开源的 Python 流处理库,目前版本为 1.10.4.
Faust 把 Kafka Streams 的概念带到了 Python,提供了包括流处理和事件处理的模式。Faust 使用纯 Python 实现,使得开发者可以使用包括 NumPy, PyTorch, Pandas 等的库进行数据处理.
Faust 实现简洁优雅,使用简单,性能优秀,且具有高可用、分布式、灵活性高的特点。目前 Faust 已被用于构建高性能分布式系统和实时数据管道中.
使用 。
Faust 需求 Python 3.6 或以上,且需要可用的 Kafka >= 0.10 服务。使用 pip 安装:
$ pip install -U faust 。
此外,一些额外的特性需要额外的依赖,如 rocksdb,可以用来作为 Faust 在生产环境中的存储,以及 Redis,可以在开启缓存时使用.
安装完成以后,就可以在项目中使用了。我们来看一个简单的例子:
import faust 。
。
app = faust.App( 。
'hello-world', 。
broker='kafka://localhost:9092', 。
value_serializer='raw', 。
) 。
。
greetings_topic = app.topic('greetings') 。
。
@app.agent(greetings_topic) 。
async def greet(greetings): 。
async for greeting in greetings: 。
print(greeting) 。
首先,我们使用 faust.App 创建一个 Faust 应用,并配置应用的名字、Kafka broker 和序列化方式.
然后,我们创建一个主题,这跟 Kafka 中的主题是对应的.
Faust 利用 Python 3.6+ 的异步语法 async,定义异步函数 greet,并注册为 Faust 应用的一个 agent。函数接收实时的数据集合 greetings,并异步地对每项数据进行输出.
把上述代码保存为 hello_world.py,并在命令行启动工作者:
$ faust -A hello_world worker -l info 。
该 Faust 工作者就会从 Kafka 中实时读取数据并处理.
我们可以发送一些数据来观察效果:
$ faust -A hello_world send @greet "Hello Faust" 。
上述命令发送了一条消息,执行后,我们就能在工作者的命令行中看到这条消息.
Faust 还充分利用了 Python 的类型提示,能够方便地定义数据模型:
import faust 。
。
class Greeting(faust.Record): 。
from_name: str 。
to_name: str 。
。
app = faust.App('hello-app', broker='kafka://localhost') 。
topic = app.topic('hello-topic', value_type=Greeting) 。
。
@app.agent(topic) 。
async def hello(greetings): 。
async for greeting in greetings: 。
print(f'Hello from {greeting.from_name} to {greeting.to_name}') 。
。
@app.timer(interval=1.0) 。
async def example_sender(app): 。
await hello.send( 。
value=Greeting(from_name='Faust', to_name='you'), 。
) 。
。
if __name__ == '__main__': 。
app.main() 。
总结 。
Faust 把 Kafka Streams 带到了 Python 中,实现了简洁高效的数据流处理。其使用简单的装饰器和基于类型提示机的据模型,就能定义实现数据的处理逻辑;充分利用了 Python 的 async 异步机制,和其他高性能的异步库,实现了高效性能;其使用 Python 实现,使用开发者可以无缝对接其他数据处理和大数据相关功能.
原文地址:https://www.toutiao.com/a6939104204019778087/ 。
最后此篇关于Faust - 简洁高效的 Python 流处理库的文章就讲到这里了,如果你想了解更多关于Faust - 简洁高效的 Python 流处理库的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在用 C# 编写动态语言的解释器,并将原始函数实现为具有虚拟 Apply 方法的抽象类 Primitive,其中每个实际原始函数都是重写 Apply 的子类。 (另一种方法是只拥有类 Primit
我正在用 C# 编写动态语言的解释器,并将原始函数实现为具有虚拟 Apply 方法的抽象类 Primitive,其中每个实际原始函数都是重写 Apply 的子类。 (另一种方法是只拥有类 Primit
我是 Dapper 的新手我正在尝试了解它实际上是如何映射事物的。我有以下数据库结构: calendar | Id | Name | meeting_room | Id | Calendar_id
抱歉问题标题很糟糕。有没有办法在一行中做到这一点: Button button = (Button)Gridview.Cells[0].FindControl("controlname"); butt
在 Java 中在声明点和使用点声明列表/数组文字的tersest方法是什么? 作为次要问题,我更喜欢一种不会导致编译时警告或要求抑制警告的方法。 注意:就我个人而言,这是针对Java 8ish on
什么是现代、简洁、快速的方法来测试节点是否有任何与给定选择器匹配的子节点? “简洁”是指类似于 jQuery 或函数式风格,例如避免循环。我知道本地选择器越来越多地使用这种类型的东西,但没有跟上发展的
getFirstNotNullResult 执行函数列表,直到其中一个函数返回非空值。 如何更优雅/简洁地实现 getNotNullFirstResult? object A { def main
根据 stackoverflow 上某人的推荐,我使用了 jquery succint https://github.com/micjamking/Succinct截断我在 php 网站上的帖子。 它
我是一名优秀的程序员,十分优秀!