- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
本篇主要介绍了一种使用Rust语言编写的查询引擎——DataFusion,其使用了基于Arrow格式的内存模型,结合Rust语言本身的优势,达成了非常优秀的性能指标 。
DataFusion是一个查询引擎而非数据库,因此其本身不具备存储数据的能力。但正因为不依赖底层存储的格式,使其成为了一个灵活可扩展的查询引擎。它原生支持了查询 CSV,Parquet,Avro,Json 等存储格式,也支持了 本地,AWS S3,Azure Blob Storage,Google Cloud Storage 等多种数据源。同时还提供了丰富的扩展接口,可以方便的让我们接入自定义的数据格式和数据源.
DataFusion具有以下特性:
基于DataFusion我们可以轻松构建高性能、高质量、可扩展的数据处理系统.
DBMS是一个包含 完整 数据库管理特性的系统,主要包含以下几个模块:
DataFusion是一种查询引擎,查询引擎属于数据库管理系统的一部分。查询引擎是用户与数据库交互的主要接口,主要作用是将面向用户的高阶查询语句翻译成可被具体执行的数据处理单元操作,然后执行操作获取数据.
DataFusion查询引擎主要由以下几部分构成:
主要涉及 DFParser 和 SqlToRel 这两个 struct 。
主要涉及 LogicalPlan 和 Expr 这两个枚举类 。
主要涉及 PyhsicalPlanner 这个 trait 实现的逻辑计划到物理计划的转换,其中主要的关键点是 ExecutionPlan 和 PhysicalExpr 。
主要涉及所有执行算子,如 GroupedHashAggregateStream 。
DataFusion查询引擎的架构还是比较简单的,其中的扩展点也非常清晰,我们可以从以下几个方面对DataFusion进行扩展:
无状态方法 。
/// 逻辑表达式枚举类
pub enum Expr {
...
ScalarUDF {
/// The function
fun: Arc<ScalarUDF>,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
},
...
}
/// UDF的逻辑表达式
pub struct ScalarUDF {
/// 方法名
pub name: String,
/// 方法签名
pub signature: Signature,
/// 返回值类型
pub return_type: ReturnTypeFunction,
/// 方法实现
pub fun: ScalarFunctionImplementation,
}
/// UDF的物理表达式
pub struct ScalarFunctionExpr {
fun: ScalarFunctionImplementation,
name: String,
/// 参数表达式列表
args: Vec<Arc<dyn PhysicalExpr>>,
return_type: DataType,
}
有状态方法 。
/// 逻辑表达式枚举类
pub enum Expr {
...
AggregateUDF {
/// The function
fun: Arc<AggregateUDF>,
/// List of expressions to feed to the functions as arguments
args: Vec<Expr>,
/// Optional filter applied prior to aggregating
filter: Option<Box<Expr>>,
},
...
}
/// UADF的逻辑表达式
pub struct AggregateUDF {
/// 方法名
pub name: String,
/// 方法签名
pub signature: Signature,
/// 返回值类型
pub return_type: ReturnTypeFunction,
/// 方法实现
pub accumulator: AccumulatorFunctionImplementation,
/// 需要保存的状态的类型
pub state_type: StateTypeFunction,
}
/// UADF的物理表达式
pub struct AggregateFunctionExpr {
fun: AggregateUDF,
args: Vec<Arc<dyn PhysicalExpr>>,
data_type: DataType,
name: String,
}
Optimizer 定义了承载优化规则的结构体,其中 optimize 方法实现了逻辑计划优化的过程。优化规则列表中的每个优化规则会被以 TOP-DOWN 或 BOTTOM-UP 方式作用于逻辑计划树,优化规则列表会被实施多个轮次。我们可以通过实现 OptimizerRule 这个 trait 来实现自己的优化逻辑.
pub struct Optimizer {
/// All rules to apply
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}
pub trait OptimizerRule {
/// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be
/// optimized by this rule.
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>>;
...
}
/// 逻辑计划算子枚举类
pub enum LogicalPlan {
...
Extension(Extension),
...
}
/// 自定义逻辑计划算子
pub struct Extension {
/// The runtime extension operator
pub node: Arc<dyn UserDefinedLogicalNode>,
}
/// 自定义逻辑计划算子需要实现的trait
pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { ... }
/// 为自定义的逻辑计划算子`UserDefinedLogcialNode`生成对应的物理计划算子
pub trait ExtensionPlanner {
async fn plan_extension(
&self,
planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
}
/// DataFusion默认的逻辑计划到物理计划的转换器提供了自定义转换过程的结构体
pub struct DefaultPhysicalPlanner {
extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
}
/// 自定义物理计划算子需要实现的trait
pub trait ExecutionPlan: Debug + Send + Sync { ... }
可以看出,自定义数据源其实就是生成一个对应的ExecutionPlan执行计划,这个执行计划实施的是扫表的任务。如果数据源支持下推的能力,我们在这里可以将 projection filters limit 等操作下推到扫表时.
/// 自定义数据源需要实现的trait
pub trait TableProvider: Sync + Send {
...
async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;
...
}
pub trait CatalogProvider: Sync + Send {
...
/// 根据名称获取Schema
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
/// 注册Schema
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
// use variables to avoid unused variable warnings
let _ = name;
let _ = schema;
Err(DataFusionError::NotImplemented(
"Registering new schemas is not supported".to_string(),
))
}
}
pub trait SchemaProvider: Sync + Send {
...
/// 根据表名获取数据源
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
/// 注册数据源
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(
"schema provider does not support registering tables".to_owned(),
))
}
...
}
逻辑计划其实就是数据流图,数据从叶子节点流向根节点 。
let df: DataFrame = ctx.read_table("http_api_requests_total")?
.filter(col("path").eq(lit("/api/v2/write")))?
.aggregate([col("status")]), [count(lit(1))])?;
这里我们就使用DataFusion的API接口构造了一个数据流,首先 read_table 节点会从数据源中扫描数据到内存中,然后经过 filter 节点按照条件进行过滤,最后经过 aggregate 节点进行聚合。数据流过最后的节点时,就生成了我们需要的数据.
上述链式调用的API接口实际上并没有真正执行对数据的操作,这里实际上是使用了 建造者 模式构造了逻辑计划树。最终生成的 DataFrame 实际上只是包含了一下信息:
pub struct DataFrame {
/// 查询上下文信息,包含了元数据,用户注册的UDF和UADF,使用的优化器,使用的planner等信息
session_state: SessionState,
/// 逻辑计划树的根节点
plan: LogicalPlan,
}
支持的逻辑计划算子 。
Projection
Filter
Window
Aggregate
Sort
Join
TableScan
Repartition
Union
Subquery
Limit
Extension
Distinct
Values
Explain
Analyze
SetVariable
Prepare
Dml(...)
CreateExternalTable
CreateView
CreateCatalogSchema
CreateCatalog
DropTable
DropView
目标:确保结果相同的情况下,执行更快 。
初始的逻辑计划,需要经过多个轮次的优化,才能生成执行效率更高的逻辑计划。DataFusion本身的优化器内置了很多优化规则,用户也可以扩展自己的优化规则.
下推(Pushdown):减少从一个节点到另一个节点的数据的行列数 。
PushDownProjection
PushDownFilter
PushDownLimit
简化(Simplify):简化表达式,减少运行时的运算。例如使用布尔代数的法则,将 b > 2 AND b > 2 简化成 b > 2 .
SimplifyExpressions
UnwrapCastInComparison
简化(Simplify):删除无用的节点 。
平铺子查询(Flatten Subqueries):将子查询用join重写 。
DecorrelateWhereExists
DecorrelatedWhereIn
ScalarSubqueryToJoin
优化join:识别join谓词 。
ExtractEqualJoinPredicate
RewriteDisjunctivePredicate
FilterNullJoinKeys
优化distinct 。
SingleDistinctToGroupBy
ReplaceDistinctWithAggregate
假设现在有这样一个谓词表达式 。
path = '/api/v2/write' or path is null 。
经过语法解析和转换后,可以用如下表达式树表示:
DataFusion在实施表达式运算时,使用了Arrow提供的向量化计算方法来加速运算 。
调用DataFusion提供的 DefaultPhysicalPlanner 中的 create_physical_plan 方法,可以将逻辑计划树转换成物理计划树。其中物理计划树中的每个节点都是一个 ExecutionPlan 。执行物理计划树时,会从根节点开始调用 execute 方法,调用该方法还没有执行对数据的操作,仅仅是将每个物理计划算子转换成一个 RecordBatchStream 算子,形成数据流算子树。这些 RecordBatchStream 算子都实现了 future 包提供的 Stream 特性,当我们最终调用 RecordBatchStream 的 collect 方法时,才会从根节点开始 poll 一次来获取一下轮要处理的数据,根节点的 poll 方法内会调用子节点的 poll 方法,最终每 poll 一次,整棵树都会进行一次数据从叶子节点到根节点的流动,生成一个 RecordBatch .
DataFusion实现的物理计划算子具有以下特性:
RecordBatch
DataFusion本身只是一个简单,高效,可扩展的查询引擎框架,用户可以将DataFusion作为开发大型数据中台的基础组件,也可以轻易地将DataFusion嵌入服务中作为查询引擎,也可以使用DataFusion构建自己的数据库系统。如果期望使用分布式的查询引擎,可以关注基于 Arrow 和 DataFusion 搭建的分布式查询引擎 Ballista .
最后此篇关于ApacheArrowDataFusion原理与架构的文章就讲到这里了,如果你想了解更多关于ApacheArrowDataFusion原理与架构的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
是否可以简化在裸机上运行的这条链: 具有随时间变化的副本数的 StatefulSet 服务 使用 proxy-next-upstream: "error http_502 timeout invali
很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visit the help center . 关闭 1
我需要为应用程序制定架构。它专为销售产品而设计。 系统每天将接受大约 30-40k 的新产品。它将导致在表 product 中创建新记录。 系统应保留价格历史记录。用户应该能够看到产品 A 的价格在去
我需要一些帮助来理解 PHP 的内部工作原理。 还记得,在过去,我们曾经写过 TSR(Terminate and stay resident)例程(pre-windows 时代)吗?一旦该程序被执行,
1.Nginx 基础架构 nginx 启动后以 daemon 形式在后台运行,后台进程包含一个 master 进程和多个 worker 进程。如下图所示: master与
本文深入探讨了Kubernetes(K8s)的关键方面,包括其架构、容器编排、网络与存储管理、安全与合规、高可用性、灾难恢复以及监控与日志系统。 关注【TechLeadCloud】,
我知道 CNN 的工作原理,包括每一层的用途(Dropout、Pooling 等)。但是,在为新数据集设计 CNN 时,我不知道要使用多少个 Conv-Relu-Pool 层,在最终获得输出之前我应该
在基于 REST 的架构中,资源和方法之间有什么区别。有吗? 最佳答案 资源是您的应用程序定义的东西;它们与物体非常相似。方法是 HTTP 动词之一,例如 GET、POST、PUT、DELETE。它们
我想用 oneOf仅在 xyType 的值上不同的模式属性(property)。我想要其中两个:一个是 xyType设置为 "1"第二个在哪里xyType是 任何其他值 .这可以使用 json 模式完
寻求 PHP 架构师的建议! 我对 PHP 不是很熟悉,但已经接管了一个用该语言编写的大型分析包的维护工作。该架构旨在将报告的数据读取到大型键/值数组中,这些数组通过各种解析模块传递,以提取每个模块已
这些存在吗? 多年来,我一直是大型强类型面向对象语言(Java 和 C#)的奴隶,并且是 Martin Fowler 及其同类的信徒。 Javascript,由于它的松散类型和函数性质,似乎不适合我习
我已经阅读了 Manning 的 Big Data Lambda Architecture ( http://www.manning.com/marz/BD_meap_ch01.pdf ),但仍然无法
在过去的几年里,我做了相当多的 iOS 开发,所以我非常熟悉 iOS 架构和应用程序设计(一切都是一个 ViewController,您可以将其推送、弹出或粘贴到选项卡栏中)。我最近开始探索正确的 M
我有以下应用程序,我在其中循环一些数据并显示它。 {{thing.title}} {{thing.description}}
昨天我和我的伙伴讨论了我正在开发的这个电子购物网站的架构。请注意,我为此使用 ASP.NET。他非常惊讶地发现我没有将添加到购物车的项目保留在 ArrayList 或其他通用列表中,而是使用 LINQ
我正在使用在 tridion 蓝图层次结构中处于较低位置的出版物。从蓝图中较高级别的出版物继承的一些内容和模式不适合我的出版物,并且永远不会被我的出版物使用。 我将跟进添加这些项目的内部团队,并尝试说
我目前已经在 Cassandra 中设计了一个架构,但我想知道是否有更好的方法来做事情。基本上,问题在于大多数(如果不是全部)读取都是动态的。我构建了一个分段系统作为应用程序服务,读取动态自定义查询(
我正在按照 documentation 中给出的 icingaweb UI v 2.0 布局执行在服务器上设置 icinga 的步骤。 。我成功进入设置页面,该页面要求您输入 token ,然后按照步
我必须保存来自不同社交媒体的用户的不同个人资料。例如用户可能有 1 个 Facebook 和 2 个 Twitter 个人资料。如果我保存每个配置文件它作为新文档插入不同的集合中,例如 faceboo
我的团队使用 Puppet 架构,该架构目前可在多个环境(流浪者、暂存、生产)中容纳单个应用程序。 我们现在想要扩展此设置的范围以支持其他应用程序。他们中的许多人将使用我们已经定义的现有模块的子集,而
我是一名优秀的程序员,十分优秀!