- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
先展示一下用途和效果 。
pentaho 可读作“彭塔湖”,原名 keetle 在 keetle 被 pentaho公司 收购后改名而来.
pentaho 是一款开源 ETL 工具,纯java编写的C/S模式的工具,可绿色免安装,开箱即用。支持Windows、macOS、Linux平台.
pentaho 有2个核心设计,即 转换 和 作业 .
转换 是一个包含输入、逻辑处理、输出的完整过程,即ETL.
作业 是一个提供定时执行转换的机制,即定时服务调度.
pentaho 官网下载链接: Pentaho Community Edition Download | Hitachi Vantara 。
pentaho 由主要四部分组成 。
由于是纯java编写,依赖jdk环境。所以需要先配置jdk环境,这里省略.
从官网下载 pentaho 安装包后,直接解压.
tar -zxvf 安装包路径 -C 目标路径
tar -zxvf 安装包路径 -C 目标路径
重点目录以及执行文件说明 。
在window上运行就用 .bat 格式脚本,MacOS 或者 Linux 平台上使用 .sh 格式脚本 。
pentaho 内置了丰富的数据处理组件,本章节主要对 pentaho 界面上各个功能组件作用进行说明.
Windows 。
运行 Spoon.bat
MacOS 。
运行 Spoon.sh
Linux 。
运行 Spoon.sh
运行后会短暂没有任何反应,等待会议,就会出现界面 。
主对象树 中有 转换 和 作业 。
开始数据处理工作前,必需新建一个 转换 ,因为只有新建了之后,才能使用数据处理组件,此时的 核心对象 树是空的.
在 “核心对象树 –> 转换 –> 右键 –> 新建” 或 在 “文件 –> 新建 –> 转换” ,新建一个转换, 核心对象 树就会出现各类组件。依靠这些组件组合使用,完成数据处理工作.
一个转换就是一个数据处理工作流程。这里主要是转换的配置,例如数据源连接,运行配置等.
包含各类数据处理组件.
这里对一些常用的组件进行说明 。
输入组件,即各类数据源,例如数据库,json,xml等 。
输出组件,将处理后的数据进行输出保存 。
这是数据转换的核心,在这里完成数据处理 。
包含一些数据处理外的操作,例如发送邮件,写日志等 。
用于控制数据处理流程,例如开始,结束,终止等 。
当内置转换组件完成不了数据处理的逻辑时,即可使用脚本组件,用自定义代码的方式来完成处理逻辑 。
用于一些查询请求,例如http请求,数据库查询某个表是否存在等 。
可用于多表,单表处理完后,进行记录合并 。
在“文件–>新建–>作业”创建一个作业.
主对象树包含作业运行配置,DB连接配置等 。
核心对象树包含作业的各类组件 。
作业流程组件,有开始、转换、成功、空处理等 。
发送邮件 。
文件操作,创建、删除等 。
条件处理,例如判断某个文件是否存在 。
使用shell,js、sql等脚本处理复杂作业逻辑 。
作业处理,例如终止作业、写日志等 。
定时作业来上传、下载文件 。
上面介绍了各个组件用途,现在来完成一个完整的数据处理工作流程.
略 。
在 “核心对象树 –> 转换 –> 右键 –> 新建” 或 在 “文件 –> 新建 –> 转换” ,新建一个转换 。
在 主对象树 中选择 DB连接 ,右键新建 。
注意:连接数据库之前需要下载对应的 jdbc驱动 ,例如连接 pgsql 则需要下载 postgresql-version.jar ,r然后将驱动包放到安装目录下的 \lib 目录 。
这里以 kingbase V8 为例,因为这个踩了坑。经历如下:
内置的数据源里有 KingbaseES ,本以为可以直接用,结果发现连不上,报驱动错误。可能是因为内置的驱动版本跟数据库版本不一致,因为 Kingbase V8 的驱动不向前兼容。更新驱动后,依然不行.
然后发现,内置还有 Generic database 选项,这个是用来自定义连接内置数据源之外的数据库的。使用 jdbc 方式连接,需要一个连接串,驱动包名(前提是下载了对应的驱动包),用户名,密码。然而,这种方式依然不行…… 。
后来一想,干脆用 pgsql 的方式来连接 kingbase ,没想到连接成功! 。
因为数据源是数据库,所以这里从输入组件中选择 表输入 ,将其拖入到右侧面板中 。
双击“表输入”组件 或 右键选择 “编辑步骤” 。
点击 获取SQL查询语句 ,会弹出界面选择数据表 。
选择一个数据表后,提示 。
选择“是” 。
这里会自动填充获取数据的sql,也可以在这里加上各种where条件,获取需要的数据 。
点击“确定” 。
如果是表结构一致,则可使用 。
因为目标数据源也是数据库,所以这里选择 表输出 。从 输出组件 选择 表输出 ,拖入转换视图中 。
然后进行 步骤连接 .
方式一:按住 shift 键,鼠标左键点选“输入步骤”,会出现箭头,然后连接到“输出步骤” 。
方式二:鼠标左键框选输入和输出,然后右键,选择”新建节点连接“,选择”起始步骤“,”目标步骤“ 。
点击“确定” 。
连接后如下:
双击“表输出”或右键选择“编辑步骤” 。
选择目标数据库中的数据表,然后点击”确定“ 。
选择表输出,无法配置字段映射,所以前提是表结构一致才可使用。如果是异构表,需要字段映射的,则需要使用 插入/更新 组件 。
如果输入表和输出表结构不一致,即异构表,则需要使用 插入/更新 组件。从 输出 中选择 插入/更新 拖入转换视图中,然后进行步骤连接,进入输出配置 。
注意:一定要正确连接步骤,否则这步无法获取输入字段,输出字段 。
字段映射配置好后如下 。
点击“确定” 。
或 。
然后点击转换视图中的 按钮,这个是运行 。
这个运行是运行一次,完成后就结束了。如果要定时运行,则需要 作业 .
点击“启动” 会弹出界面 保存 当前转换 。
输入保存的文件名称,然后点击“Save”即可 。
每个步骤都显示绿色的箭头,说明没有错误,正确的执行完了转换。也可以在日志输出查看. 。
日志: 完成处理 (I=1, O=1, R=1, W=1, U=0, E=0) 中的 I=1 表示 输入 1 行,O=1表示 输出 1 行,R=1 表示 读取 1 行,W=1 表示 写入 1 行 。
然后看一下数据输出结果 。
源表 。
目标表 。
如果需要定时执行同步过程,那么就需要引入 作业 。在“文件–>新建–>作业” 创建一个作业.
在“通用”中选择 Start 拖入作业视图中 。
然后选择 转换 拖入视图,并进行步骤连接.
双击“转换”或右键选择“编辑作业入口” 。
点击“确定” 。
然后选 成功 组件拖入视图,并连接步骤 。
双击视图中的 Start 组件或右键”编辑作业入口“,进行作业调度配置 。
点击运行视图中的 按钮。一个定时作业即完成 。
定时作业调度期间,程序不能退出!程序退出,作业即停止 。
至此一个完整的数据处理作业完成了.
本部分对 简单同步 进行说明。 简单同步 是指不涉及复杂计算、转换等同步工作.
即一对一同步,A表数据同步到B表,A与B的字段数量、类型、名称可能都不一样,因此需要一些字段类型转换,这都很容易.
处理过程详见 2.5章节 。
即2个及以上的表往一个表同步,同样也需要字段映射、类型转换等操作.
这种通过某个字段(外键)关联的表,处理思路是在获取数据时,通过sql联表查询,获取到全部需要的数据。然后用单表同步方式进行处理.
如果是异构表的话,获取到每个数据源后,使用 Multiway merge join 多路合并组件处理 。
合并后的记录可作为一个单表,然后进行单表同步的处理 。
合并是笛卡尔积,即A表n条记录,B表n条记录,结果就是n x n条记录,字段是A、B表全部字段,这种方式不建议采用,会消耗更多内存资源。建议拆分成单表同步 。
如果是同构表的话,可拆分为多个单表同步处理.
本部分对涉及到数据计算、转换的同步工作进行说明。有些复杂操作,无法直接使用组件进行处理,需要用到 Script 组件。这里主要对如何使用脚本组件完成数据处理进行说明.
这里先展示一个实际案例 。
这个过程是多表同步到一个表、涉及到字段类型转换、补充字段和值、数据计算、增补数据。由于计算和增补数据使用内置组件无法完成,因此这里使用了 java脚本 组件,自定义代码进行数据处理.
这里对 字段类型转换 、 增加列 、 给某列设置值 、 java脚本 进行说明.
例如 数字类型 转为 字符串,字符串 转为 日期时间…… 。
从 转换 中选择 字段选择 组件 。
双击“字段选择”或右键选择“编辑步骤” 。
选择“元数据”,在“字段名称”列选择字段,然后在“类型”列选择目标类型 。
在 输入 中找到 生成随机数 组件,拖入视图并连接步骤 。
双击 生成随机数 或右键选择“编辑步骤” 。
在“名称”列输入需要增加的字段名,类型选择生成随机数规则 。
点击“确定”后,运行转换,在 preview data 处可预览数据,可以看到增加了一列 uid 也有值 。
例如将上面随机数组件生成的值设置为常量1。在 转换 中选择 将字段设置为常量 组件,并连接步骤 。
双击“将字段设置为常量”或右键选择“编辑步骤” 。
在“字段”列选择需要设置的字段,这里选择上一步骤生成的“uid”字段,在“值替换”列输入值.
点击“确定”,运行转换,然后预览数据,可以看到uid的值被替换为1 。
脚本 有Java脚本、JavaScript脚本,SQL脚本等。这里使用Java脚本,脚本的目的是处理内置组件处理不了的逻辑。例如有10个地层,但是数据源中只记录了前9个地层,最后一个需要根据计算得到.
拖入 Java脚本 组件到转换视图中并连接步骤 。
双击“Java脚本”或右键选择“编辑步骤” 。
然后展开 Code Snippits\Common use 。
选择 Main 拖入右侧编辑区,Main是整个脚本处理入口 。
其默认脚本结构如下 。
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
// 代码逻辑区域
/* TODO: Your code here. (Using info fields)
FieldHelper infoField = get(Fields.Info, "info_field_name");
RowSet infoStream = findInfoRowSet("info_stream_tag");
Object[] infoRow = null;
int infoRowCount = 0;
// Read all rows from info step before calling getRow() method, which returns first row from any
// input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
while((infoRow = getRowFrom(infoStream)) != null){
// do something with info data
infoRowCount++;
}
*/
}
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
r = createOutputRow(r, data.outputRowMeta.size());
/* TODO: Your code here. (See Sample)
// Get the value from an input field
String foobar = get(Fields.In, "a_fieldname").getString(r);
foobar += "bar";
// Set a value in a new output field
get(Fields.Out, "output_fieldname").setValue(r, foobar);
*/
// Send the row on to the next step.
putRow(data.outputRowMeta, r);
return true;
}
TODO 区域就是代码编辑区域,其它是默认脚本函数 。
点击”确定“,然后再次打开 Java脚本 ,就能看到输入输出字段信息了 。
完整实现地层计算并补充最后一层的 Java脚本 代码逻辑如下 。
// 这里是需要用的 java API 所导入的包
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.lang.*;
import java.math.BigDecimal;
import java.util.*;
// 核心处理过程入口
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
logBasic("----------------------字段-------------------------");
String projectCount = "project_count";
String knumber = "knumber";
String depth = "depth";
String dep = "dep";
String layerorder = "layerorder";
String id = "id";
logBasic("----------------------获取输入流-------------------------");
// 输入数据流 input 是消息步骤中设置的标签名
RowSet infoStream = findInfoRowSet("input");
Object[] infoRow = null;
int infoRowCount = 0;
logBasic("----------------------遍历数据流,将其加载到map-------------------------");
// 遍历数据流,将其加载到map,便于操作
// 根据 项目索引+钻孔索引 分组
Map<String, ArrayList<Object[]>> groups = new HashMap<String, ArrayList<Object[]>>();
while((infoRow = getRowFrom(infoStream)) != null){
// 获取字段值
String prjCode = get(TransformClassBase.Fields.In, projectCount).getString(infoRow);
String drillCode = get(TransformClassBase.Fields.In, knumber).getString(infoRow);
String groupKey = prjCode + drillCode;
if (!groups.containsKey(groupKey)) {
logBasic("----------------------创建分组-------------------------");
groups.put(groupKey,new ArrayList<Object[]>());
}
logBasic("----------------------添加数据到分组-------------------------");
ArrayList<Object[]> objects = (ArrayList<Object[]>)groups.get(groupKey);
objects.add(infoRow);
logBasic("----------------------添加数据到输出流-------------------------");
// 将当前行拷贝一份
Object[] row=infoRow;
// 创建一个输出行
row = createOutputRow(infoRow, data.outputRowMeta.size());
//putRow(infoStream.getRowMeta(), row);
// 将输出行添加到输出数据集
putRow(data.outputRowMeta, row);
infoRowCount++;
}
logBasic("----------------------分组完成,处理最后一条数据-------------------------");
// 将最后一条数据拷贝一份,场地分层索引+1,层底深度dep 赋值为 钻孔深度 depth,然后将此行数数据添加
Object[] keys = groups.keySet().toArray();
for (int i = 0; i < keys.length; i++) {
String s = keys[i].toString();
logBasic("----------------------当前分组-----------------------:"+ s);
ArrayList<Object[]> list = (ArrayList<Object[]>) groups.get(s);
Object[] last = (Object[])list.get(list.size() - 1);
Object[] newLast=last;
// 设置 layerorder 的值
String layerorderVal = get(TransformClassBase.Fields.In, layerorder).getString(last);
BigDecimal v = new BigDecimal(layerorderVal);
v = v.add(new BigDecimal(1));
get(TransformClassBase.Fields.Out, layerorder).setValue(newLast, v);
// 设置 dep 的值
String layerDepVal = get(TransformClassBase.Fields.In, depth).getString(last);
BigDecimal v2 = new BigDecimal(layerDepVal);
get(TransformClassBase.Fields.Out, dep).setValue(newLast, v2);
// 设置id
String idVal = UUID.randomUUID().toString();
get(TransformClassBase.Fields.Out, id).setValue(newLast, idVal);
logBasic("----------------------添加数据到输出流-------------------------");
newLast=createOutputRow(newLast, data.outputRowMeta.size());
// 将新的一行数据添加到输出数据集
putRow(data.outputRowMeta, newLast);
}
/* TODO: Your code here. (Using info fields)
FieldHelper infoField = get(Fields.Info, "info_field_name");
RowSet infoStream = findInfoRowSet("info_stream_tag");
Object[] infoRow = null;
int infoRowCount = 0;
// Read all rows from info step before calling getRow() method, which returns first row from any
// input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.
while((infoRow = getRowFrom(infoStream)) != null){
// do something with info data
infoRowCount++;
}
*/
}
Object[] r = getRow();
logBasic("----------------------getRow-----------------------:"+ r);
if (r == null) {
setOutputDone();
return false;
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
r = createOutputRow(r, data.outputRowMeta.size());
logBasic("----------------------createOutputRow-----------------------:"+ r);
/* TODO: Your code here. (See Sample)
// Get the value from an input field
String foobar = get(Fields.In, "a_fieldname").getString(r);
foobar += "bar";
// Set a value in a new output field
get(Fields.Out, "output_fieldname").setValue(r, foobar);
*/
// Send the row on to the next step.
putRow(data.outputRowMeta, r);
return true;
}
至此 Java脚本 处理完成.
痛(坑)点总结:
1.脚本编辑区是个文本编辑框,不能像IDEA一样帮助写代码,只能通过日志进行输出验证逻辑 。
2.建议通用的不涉及pentaho的java代码操作,可以在IDEA中完成,然后拷贝到脚本编辑区。例如需要导入的包就是在IDEA中通过智能导入,然后拷贝的 。
验证一下数据,图中标记的行,就是根据前2行数据计算而来,然后进行补充的。在数据源中只记录了前2行数据.
最后此篇关于pentaho(keetle)使用手册的文章就讲到这里了,如果你想了解更多关于pentaho(keetle)使用手册的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我想将浏览文件页面设置为 Pentaho BI 服务器的主页。默认情况下,我们需要单击浏览文件按钮才能查看我们的文件。我想在我的主页上看到我的文件 有人可以帮帮我吗? 最佳答案 1.) 在文本编辑器中
我是 Pentaho 数据集成的新手;我需要将一个数据库作为 ETL 作业集成到另一个位置。我想计算 ETL 作业期间插入/更新的次数,并将该计数插入到另一个表中。谁能帮我解决这个问题吗? 最佳答案
我正在以非常基本的方式使用水壶。我想要做的是从 csv 文件中读取,在用户定义的 Java 类步骤中进行某种转换并将输出写入文本文件。 a picture http://imageshack.com/
我有一个案例,我将 X 行数据库名称作为我需要连接的参数。我已经成功地完成了工作和转换,为作为参数给出的每个数据库名称重复表输入步骤。因此,当这些 dbnames 恰好有效时,这一切都很好地工作。但是
这可能是一个基本问题,但我想知道 Pentaho 数据集成中保存的转换在哪里。目前,我正在连接到一个存储库,我的所有工作和转换都保存在那里。我希望能够通过电子邮件将特定转换发送给另一个人。我在服务器上
当我们通过 shell 脚本直接调用 .ktr 文件来运行 Pentaho 转换(.ktr 文件)时,有没有办法指定日志记录级别(基本/最小)等?默认值是多少? 最佳答案 来自 Pan 文档: "设置
我想使用 Pentaho 报表设计器使用 命令行 自动化整个报表生成过程。有没有可能在Pentaho中实现? 输入 = 存储在数据库中的结果数据Output = 使用 Report designer
需要将多个事实表关联到一个蒙德里安立方体。模式工作台不允许这样做。我们怎样才能做到这一点? 最佳答案 您不能在多维数据集中添加多个事实表。 Schema Workbench 希望您有一个星型模式,其中
pentaho 文档 ( http://wiki.pentaho.com/display/EAI/Job+checkpoints+and+restartability ) 指定,从版本 5.0 开始,
我正在尝试在 pentaho cde 中制作数据表。在pentaho中组件的名称是Table Component。 问题是我尝试执行没有特定列数的动态查询,并出现以下错误: DataTables wa
任何人都可以提供在pentaho数据集成中获取变量和从结果步骤获取行的示例吗? 我的工作需要进行两次转变。 第一个转换采用样本输入并生成样本输出,最后我将行复制到结果步骤。 我的第二个转换从结果步骤中
我正在研究 PDI 水壶。我们可以定义一个变量并在数据库连接名称中使用它吗?因此,如果将来我需要更改多个转换中的连接,我会只更改 kettle 属性文件中的变量值吗? 最佳答案 只需在数据库连接中使用
我正在使用水壶勺进行改造。 如何从“获取系统信息”给出固定输入日期?我看到选择昨天、一个月前等选项。但我想手动选择固定日期,例如:'2012-12-14' 我从转换中得到了一个 csv,“文本文件输出
我需要参数化我的水壶作业和转换中的所有变量(作业将在 AWS 中运行,所有参数都作为环境变量传入)。 我在作业中的连接、路径和各种其他参数及其伴随的转换使用 ${SOURCE_DB_PASSWORD}
我知道我可以通过在vi模式下打开spoon.sh来检查Pentaho版本,但是安装在生产环境中的Pentaho没有该信息。可能在我之前的某个员工已经编辑过它。那么有没有其他方法可以让我知道服务器上正在
我是 Pentaho 新手,需要将表格从行转置为列,但第一列不包含标题。 它看起来像这样: Jan/15 Feb/15 Mar/15 Apr/15 1.1 3.4 1.7
我必须处理一个包含多个聚合级别的电子表格。大多数情况下,这很好,但在一种情况下,我需要将最高聚合级别的信息与下一个聚合级别的信息结合使用。这是一个例子: Title, Platform
我在这个报告中创建了一个 CDE 参数报告我想通过 url 传递参数我的 CDE 报告链接如下 http://localhost:8080/pentaho/content/pentaho-cdf-dd
我昨天在 Redhat 机器上运行 Pentaho 5.3 Biserver 出现电源故障,当电源恢复时我试图启动它提供的 biserver HTTP Status 404 - type Status
我正在通过模式工作台或 Ivy 模式编辑器创建多维数据集(xml 模式)。 当我发布它时,我想知道模式(mondrian.xml 文件)实际保存在哪里,这些文件的位置是什么? 谢谢, 最佳答案 您使用
我是一名优秀的程序员,十分优秀!