- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
本篇主要介绍了一种使用Rust语言编写的查询引擎——DataFusion,其使用了基于Arrow格式的内存模型,结合Rust语言本身的优势,达成了非常优秀的性能指标 。
DataFusion是一个查询引擎而非数据库,因此其本身不具备存储数据的能力。但正因为不依赖底层存储的格式,使其成为了一个灵活可扩展的查询引擎。它原生支持了查询 CSV,Parquet,Avro,Json 等存储格式,也支持了 本地,AWS S3,Azure Blob Storage,Google Cloud Storage 等多种数据源。同时还提供了丰富的扩展接口,可以方便的让我们接入自定义的数据格式和数据源.
DataFusion具有以下特性:
基于DataFusion我们可以轻松构建高性能、高质量、可扩展的数据处理系统.
DBMS是一个包含 完整 数据库管理特性的系统,主要包含以下几个模块:
DataFusion是一种查询引擎,查询引擎属于数据库管理系统的一部分。查询引擎是用户与数据库交互的主要接口,主要作用是将面向用户的高阶查询语句翻译成可被具体执行的数据处理单元操作,然后执行操作获取数据.
DataFusion查询引擎主要由以下几部分构成:
主要涉及 DFParser 和 SqlToRel 这两个 struct 。
主要涉及 LogicalPlan 和 Expr 这两个枚举类 。
主要涉及 PyhsicalPlanner 这个 trait 实现的逻辑计划到物理计划的转换,其中主要的关键点是 ExecutionPlan 和 PhysicalExpr 。
主要涉及所有执行算子,如 GroupedHashAggregateStream 。
DataFusion查询引擎的架构还是比较简单的,其中的扩展点也非常清晰,我们可以从以下几个方面对DataFusion进行扩展:
无状态方法 。
/// 逻辑表达式枚举类
pub enum Expr {
...
ScalarUDF {
/// The function
fun: Arc<ScalarUDF>,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
},
...
}
/// UDF的逻辑表达式
pub struct ScalarUDF {
/// 方法名
pub name: String,
/// 方法签名
pub signature: Signature,
/// 返回值类型
pub return_type: ReturnTypeFunction,
/// 方法实现
pub fun: ScalarFunctionImplementation,
}
/// UDF的物理表达式
pub struct ScalarFunctionExpr {
fun: ScalarFunctionImplementation,
name: String,
/// 参数表达式列表
args: Vec<Arc<dyn PhysicalExpr>>,
return_type: DataType,
}
有状态方法 。
/// 逻辑表达式枚举类
pub enum Expr {
...
AggregateUDF {
/// The function
fun: Arc<AggregateUDF>,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
/// Optional filter applied prior to aggregating
filter: Option<Box<Expr>>,
},
...
}
/// UADF的逻辑表达式
pub struct AggregateUDF {
/// 方法名
pub name: String,
/// 方法签名
pub signature: Signature,
/// 返回值类型
pub return_type: ReturnTypeFunction,
/// 方法实现
pub accumulator: AccumulatorFunctionImplementation,
/// 需要保存的状态的类型
pub state_type: StateTypeFunction,
}
/// UADF的物理表达式
pub struct AggregateFunctionExpr {
fun: AggregateUDF,
args: Vec<Arc<dyn PhysicalExpr>>,
data_type: DataType,
name: String,
}
Optimizer 定义了承载优化规则的结构体,其中 optimize 方法实现了逻辑计划优化的过程。优化规则列表中的每个优化规则会被以 TOP-DOWN 或 BOTTOM-UP 方式作用于逻辑计划树,优化规则列表会被实施多个轮次。我们可以通过实现 OptimizerRule 这个 trait 来实现自己的优化逻辑.
pub struct Optimizer {
/// All rules to apply
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}
pub trait OptimizerRule {
/// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be
/// optimized by this rule.
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>>;
...
}
/// 逻辑计划算子枚举类
pub enum LogicalPlan {
...
Extension(Extension),
...
}
/// 自定义逻辑计划算子
pub struct Extension {
/// The runtime extension operator
pub node: Arc<dyn UserDefinedLogicalNode>,
}
/// 自定义逻辑计划算子需要实现的trait
pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { ... }
/// 为自定义的逻辑计划算子`UserDefinedLogcialNode`生成对应的物理计划算子
pub trait ExtensionPlanner {
async fn plan_extension(
&self,
planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
}
/// DataFusion默认的逻辑计划到物理计划的转换器提供了自定义转换过程的结构体
pub struct DefaultPhysicalPlanner {
extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
}
/// 自定义物理计划算子需要实现的trait
pub trait ExecutionPlan: Debug + Send + Sync { ... }
可以看出,自定义数据源其实就是生成一个对应的ExecutionPlan执行计划,这个执行计划实施的是扫表的任务。如果数据源支持下推的能力,我们在这里可以将 projection filters limit 等操作下推到扫表时.
/// 自定义数据源需要实现的trait
pub trait TableProvider: Sync + Send {
...
async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;
...
}
pub trait CatalogProvider: Sync + Send {
...
/// 根据名称获取Schema
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
/// 注册Schema
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
// use variables to avoid unused variable warnings
let _ = name;
let _ = schema;
Err(DataFusionError::NotImplemented(
"Registering new schemas is not supported".to_string(),
))
}
}
pub trait SchemaProvider: Sync + Send {
...
/// 根据表名获取数据源
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
/// 注册数据源
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(
"schema provider does not support registering tables".to_owned(),
))
}
...
}
逻辑计划其实就是数据流图,数据从叶子节点流向根节点 。
let df: DataFrame = ctx.read_table("http_api_requests_total")?
.filter(col("path").eq(lit("/api/v2/write")))?
.aggregate([col("status")]), [count(lit(1))])?;
这里我们就使用DataFusion的API接口构造了一个数据流,首先 read_table 节点会从数据源中扫描数据到内存中,然后经过 filter 节点按照条件进行过滤,最后经过 aggregate 节点进行聚合。数据流过最后的节点时,就生成了我们需要的数据.
上述链式调用的API接口实际上并没有真正执行对数据的操作,这里实际上是使用了 建造者 模式构造了逻辑计划树。最终生成的 DataFrame 实际上只是包含了一下信息:
pub struct DataFrame {
/// 查询上下文信息,包含了元数据,用户注册的UDF和UADF,使用的优化器,使用的planner等信息
session_state: SessionState,
/// 逻辑计划树的根节点
plan: LogicalPlan,
}
支持的逻辑计划算子 。
Projection
Filter
Window
Aggregate
Sort
Join
TableScan
Repartition
Union
Subquery
Limit
Extension
Distinct
Values
Explain
Analyze
SetVariable
Prepare
Dml(...)
CreateExternalTable
CreateView
CreateCatalogSchema
CreateCatalog
DropTable
DropView
目标:确保结果相同的情况下,执行更快 。
初始的逻辑计划,需要经过多个轮次的优化,才能生成执行效率更高的逻辑计划。DataFusion本身的优化器内置了很多优化规则,用户也可以扩展自己的优化规则.
下推(Pushdown):减少从一个节点到另一个节点的数据的行列数 。
PushDownProjection
PushDownFilter
PushDownLimit
简化(Simplify):简化表达式,减少运行时的运算。例如使用布尔代数的法则,将 b > 2 AND b > 2 简化成 b > 2 .
SimplifyExpressions
UnwrapCastInComparison
简化(Simplify):删除无用的节点 。
平铺子查询(Flatten Subqueries):将子查询用join重写 。
DecorrelateWhereExists
DecorrelatedWhereIn
ScalarSubqueryToJoin
优化join:识别join谓词 。
ExtractEqualJoinPredicate
RewriteDisjunctivePredicate
FilterNullJoinKeys
优化distinct 。
SingleDistinctToGroupBy
ReplaceDistinctWithAggregate
假设现在有这样一个谓词表达式 。
path = '/api/v2/write' or path is null 。
经过语法解析和转换后,可以用如下表达式树表示:
DataFusion在实施表达式运算时,使用了Arrow提供的向量化计算方法来加速运算 。
调用DataFusion提供的 DefaultPhysicalPlanner 中的 create_physical_plan 方法,可以将逻辑计划树转换成物理计划树。其中物理计划树中的每个节点都是一个 ExecutionPlan 。执行物理计划树时,会从根节点开始调用 execute 方法,调用该方法还没有执行对数据的操作,仅仅是将每个物理计划算子转换成一个 RecordBatchStream 算子,形成数据流算子树。这些 RecordBatchStream 算子都实现了 future 包提供的 Stream 特性,当我们最终调用 RecordBatchStream 的 collect 方法时,才会从根节点开始 poll 一次来获取一下轮要处理的数据,根节点的 poll 方法内会调用子节点的 poll 方法,最终每 poll 一次,整棵树都会进行一次数据从叶子节点到根节点的流动,生成一个 RecordBatch .
DataFusion实现的物理计划算子具有以下特性:
RecordBatch
DataFusion本身只是一个简单,高效,可扩展的查询引擎框架,用户可以将DataFusion作为开发大型数据中台的基础组件,也可以轻易地将DataFusion嵌入服务中作为查询引擎,也可以使用DataFusion构建自己的数据库系统。如果期望使用分布式的查询引擎,可以关注基于 Arrow 和 DataFusion 搭建的分布式查询引擎 Ballista .
最后此篇关于ApacheArrowDataFusion原理与架构的文章就讲到这里了,如果你想了解更多关于ApacheArrowDataFusion原理与架构的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
本文全面深入地探讨了Docker容器通信技术,从基础概念、网络模型、核心组件到实战应用。详细介绍了不同网络模式及其实现,提供了容器通信的技术细节和实用案例,旨在为专业从业者提供深入的技术洞见和实
📒博客首页:崇尚学技术的科班人 🍣今天给大家带来的文章是《Dubbo快速上手 -- 带你了解Dubbo使用、原理》🍣 🍣希望各位小伙伴们能够耐心的读完这篇文章🍣 🙏博主也在学习阶段,如若发
一、写在前面 我们经常使用npm install ,但是你是否思考过它内部的原理是什么? 1、执行npm install 它背后帮助我们完成了什么操作? 2、我们会发现还有一个成为package-lo
Base64 Base64 是什么?是将字节流转换成可打印字符、将可打印字符转换为字节流的一种算法。Base64 使用 64 个可打印字符来表示转换后的数据。 准确的来说,Base64 不算
目录 协程定义 生成器和yield语义 Future类 IOLoop类 coroutine函数装饰器 总结 tornado中的
切片,这是一个在go语言中引入的新的理念。它有一些特征如下: 对数组抽象 数组长度不固定 可追加元素 切片容量可增大 容量大小成片增加 我们先把上面的理念整理在这
文章来源:https://sourl.cn/HpZHvy 引 言 本文主要论述的是“RPC 实现原理”,那么首先明确一个问题什么是 RPC 呢?RPC 是 Remote Procedure Call
源码地址(包含所有与springmvc相关的,静态文件路径设置,request请求入参接受,返回值处理converter设置等等): spring-framework/WebMvcConfigurat
请通过简单的java类向我展示一个依赖注入(inject)原理的小例子虽然我已经了解了spring,但是如果我需要用简单的java类术语来解释它,那么你能通过一个简单的例子向我展示一下吗?提前致谢。
1、背景 我们平常使用手机和电脑上网,需要访问公网上的网络资源,如逛淘宝和刷视频,那么手机和电脑是怎么知道去哪里去拿到这个网络资源来下载到本地的呢? 就比如我去食堂拿吃的,我需要
大家好,我是飞哥! 现在 iptables 这个工具的应用似乎是越来越广了。不仅仅是在传统的防火墙、NAT 等功能出现,在今天流行的的 Docker、Kubernets、Istio 项目中也经
本篇涉及到的所有接口在公开文档中均无,需要下载 GitHub 上的源码,自己创建私有类的文档。 npm run generateDocumentation -- --private yarn gene
我最近在很多代码中注意到人们将硬编码的配置(如端口号等)值放在类/方法的深处,使其难以找到,也无法配置。 这是否违反了 SOLID 原则?如果不是,我是否可以向我的团队成员引用另一个“原则”来说明为什
我是 C#、WPF 和 MVVM 模式的新手。很抱歉这篇很长的帖子,我试图设定我所有的理解点(或不理解点)。 在研究了很多关于 WPF 提供的命令机制和 MVVM 模式的文本之后,我在弄清楚如何使用这
可比较的 jQuery 函数 $.post("/example/handler", {foo: 1, bar: 2}); 将创建一个带有 post 参数 foo=1&bar=2 的请求。鉴于 $htt
如果Django不使用“延迟查询执行”原则,主要问题是什么? q = Entry.objects.filter(headline__startswith="What") q = q.filter(
我今天发现.NET框架在做计算时遵循BODMAS操作顺序。即计算按以下顺序进行: 括号 订单 部门 乘法 添加 减法 但是我四处搜索并找不到任何文档确认 .NET 绝对 遵循此原则,是否有此类文档?如
已结束。此问题不符合 Stack Overflow guidelines .它目前不接受答案。 我们不允许提出有关书籍、工具、软件库等方面的建议的问题。您可以编辑问题,以便用事实和引用来回答它。 关闭
API 回顾 在创建 Viewer 时可以直接指定 影像供给器(ImageryProvider),官方提供了一个非常简单的例子,即离屏例子(搜 offline): new Cesium.Viewer(
As it currently stands, this question is not a good fit for our Q&A format. We expect answers to be
我是一名优秀的程序员,十分优秀!