- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
**摘要:**MRS支持在大数据存储容量大、计算资源需要弹性扩展的场景下,用户将数据存储在OBS服务中,使用MRS集群仅做数据计算处理的存算分离模式。
本文分享自华为云社区《【云小课】EI第47课 MRS离线数据分析-通过Flink作业处理OBS数据》,作者:Hello EI 。
MRS支持在大数据存储容量大、计算资源需要弹性扩展的场景下,用户将数据存储在OBS服务中,使用MRS集群仅做数据计算处理的存算分离模式。
Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。
本文将向您介绍如何在MRS集群中运行Flink作业来处理OBS中存储的数据。
Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。
在本示例中,我们使用MRS集群内置的Flink WordCount作业程序,来分析OBS文件系统中保存的源数据,以统计源数据中的单词出现次数。
当然您也可以获取MRS服务样例代码工程,参考Flink开发指南开发其他Flink流作业程序。
本案例基本操作流程如下所示:
创建并购买一个包含有Flink组件的MRS集群,详情请参见购买自定义集群。
本文以购买MRS 3.1.0版本的集群为例,集群未开启Kerberos认证。
在本示例中,由于我们要分析处理OBS文件系统中的数据,因此在集群的高级配置参数中要为MRS集群绑定IAM权限委托,使得集群内组件能够对接OBS并具有对应文件系统目录的操作权限。
您可以直接选择系统默认的“MRS_ECS_DEFAULT_AGENCY”,也可以自行创建其他具有OBS文件系统操作权限的自定义委托。
集群购买成功后,在MRS集群的任一节点内,使用omm用户安装集群客户端,具体操作可参考安装并使用集群客户端。
例如客户端安装目录为“/opt/client”。
在创建Flink作业进行数据分析前,我们需要在提前准备待分析的测试数据,并将该数据上传至OBS文件系统中。
1、本地创建一个“mrs_flink_test.txt”文件,例如文件内容如下:
This is a test demo for MRS Flink. Flink is a unified computing framework that supports both batch processing and stream processing. It provides a stream data processing engine that supports data distribution and parallel computing.
2、在云服务列表中选择“存储 > 对象存储服务”,登录OBS管理控制台。
3、单击“并行文件系统”,创建一个并行文件系统,并上传测试数据文件。
例如创建的文件系统名称为“mrs-demo-data”,单击系统名称,在“文件”页面中,新建一个文件夹“flink”,上传测试数据至该目录中。
则本示例的测试数据完整路径为“obs://mrs-demo-data/flink/mrs_flink_test.txt”。
4、上传数据分析应用程序。
使用管理台界面直接提交作业时,将已开发好的Flink应用程序jar文件也可以上传至OBS文件系统中,或者MRS集群内的HDFS文件系统中。
本示例中我们使用MRS集群内置的Flink WordCount样例程序,可从MRS集群的客户端安装目录中获取,即“/opt/client/Flink/flink/examples/batch/WordCount.jar”。
将“WordCount.jar”上传至“mrs-demo-data/program”目录下。
例如本示例中,我们设置为“--input obs://mrs-demo-data/flink/mrs_flink_test.txt --output obs://mrs-demo-data/flink/output”。
5.确认作业配置信息后,单击“确定”,完成作业的新增,并等待运行完成。
1、使用root用户登录集群客户端节点,进入客户端安装目录。
su - omm
cd /opt/client
source bigdata_env
2、执行以下命令验证集群是否可以访问OBS。
hdfs dfs -ls obs://mrs-demo-data/flink
3、提交Flink作业,指定源文件数据进行消费。
flink run -m yarn-cluster /opt/client/Flink/flink/examples/batch/WordCount.jar --input obs://mrs-demo-data/flink/mrs_flink_test.txt --output obs://mrs-demo/data/flink/output2
执行后结果类似如下:
...
Cluster started: Yarn cluster with application id application_1654672374562_0011
Job has been submitted with JobID a89b561de5d0298cb2ba01fbc30338bc
Program execution finished
Job with JobID a89b561de5d0298cb2ba01fbc30338bc has finished.
Job Runtime: 1200 ms
3.等待作业运行完成后,在OBS文件系统中指定的结果输出文件中可查看数据分析输出的结果。
下载“output”文件到本地并打开,可查看输出的分析结果。
a 3
and 2
batch 1
both 1
computing 2
data 2
demo 1
distribution 1
engine 1
flink 2
for 1
framework 1
is 2
it 1
mrs 1
parallel 1
processing 3
provides 1
stream 2
supports 2
test 1
that 2
this 1
unified 1
使用集群客户端命令行提交作业时,若不指定输出目录,在作业运行界面也可直接查看数据分析结果。
Job with JobID xxx has finished.
Job Runtime: xxx ms
Accumulator Results:
- e6209f96ffa423974f8c7043821814e9 (java.util.ArrayList) [31 elements]
(a,3)
(and,2)
(batch,1)
(both,1)
(computing,2)
(data,2)
(demo,1)
(distribution,1)
(engine,1)
(flink,2)
(for,1)
(framework,1)
(is,2)
(it,1)
(mrs,1)
(parallel,1)
(processing,3)
(provides,1)
(stream,2)
(supports,2)
(test,1)
(that,2)
(this,1)
(unified,1)
我希望使用 API 根据处理 Q 的大小更改运行的 Web 作业实例的数量,我知道我可以在门户中设置规则,但最短聚合时间为 60 分钟,并且我如果我们突然遇到大量工作,不希望系统在扩展之前等待 60
假设我有一个 spark 应用程序并且有两个操作导致两个 spark 作业。 //spark Application //Spark Job1 .... erro
大家好! 作为我对Java的自学的一部分,我正在尝试完成可用的Java初学者分配之一here(非常古老的东西-2001) 问题是我不知道如何应对这个挑战:(我将不胜感激任何建议,因为该解决方案不再可用
我一直在使用 HADOOP 1.2.1 服务器,并在那里执行许多 pig 作业。最近,我考虑将我的 Hadoop 服务器更改为 HADOOP 2.2.0。所以我在 HADOOP 2.2.0 中尝试了一
好的,我修复了静态错误。现在我只是想找出为什么每个对象都得到相同的条目(即相同的名字、年龄、体重等)。这是代码: package classlab3b; import classlab3B.BodyM
我的家庭作业中的一个问题需要一些帮助,我已经尝试了大约一个小时,但无法运行。 列出购买商品数量超过每位顾客平均商品数量的顾客 表格如下: Customer(Cnum, CustomerName, Ad
Kubernetes Jobs重复创建 Pod,直到指定数量的容器成功终止。作业通常与更高级别的CronJob机制一起使用,该机制会按循环计划自动启动新作业。 定期使用 Jobs 和 CronJobs
我有以下工作类(我已经删除了实际的工作代码): @On("0 0 1 * * ?") public class DailyJob extends Job { @Override pub
假设您将 cron 作业配置为每分钟运行一次以做某事。如果实际任务运行时间超过一分钟会发生什么? cron 会创建另一个作业实例/线程吗?还是 cron 会等待并确保上一次运行完成? 谢谢! 最佳答案
我们正在使用 TeamCity 7 并想知道是否可以仅在前一个步骤失败时才运行步骤?我们在构建步骤配置中的选项让您可以选择仅在所有步骤都成功时执行,即使步骤失败,或者始终运行它。 有没有办法仅在前一个
我在 oracle 中编写作业以执行存储过程,但是当时机成熟时,它不会无缘无故地发生任何事情。 是否有某种日志可以让我查看是否发生了错误或其他事情? 我使用 dbms_job 包来创建作业 恩克斯。
我正在用 Java 创建一个用于文件共享的 p2p 应用程序。每个对等节点都将在我的机器上的不同端口上运行并监听请求。但我遇到的问题是,当创建 PeerNode 实例时,我的代码会进入无限循环。以下是
我正在尝试创建一个队列,但当我运行 php artisanqueue:work 时它不起作用,我在终端中得到的只是 [2017-11-30 19:56:27] Processing: App\Jobs
我正在使用PHP库phpseclib0.2.2将SSH自动化到我的一台服务器中。我将其设置为每5分钟运行一次的cron任务。 在设置完它并确保其运行等情况下注销后,我看到了以下内容: $ logout
有没有办法获取多分支管道作业扫描收集到的所有分支的名称? 我想设置一个依赖于现有构建作业的夜间构建,因此需要检查多分支作业是否包含某些特定分支。另一种方法是检查现有作业。 最佳答案 我通过使用 Jen
我在编程方面还很陌生,我不太确定如何完成分配给我的学校作业。 Write a function void print_min(unsigned char a, short b,int c),which
我的作业有问题,需要帮助! 问题 1: 完成下面的 Java 方法,以便 raiseToPower(x,n) 将数字 x 提高到整数 n 次方(即计算值 xn )。请记住 x-n = 1/xn,x0
我正在做一项家庭作业,该作业有四个文本字段和一个文本区域,以及一个将文本字段和文本区域保存到文本文件的按钮,每行一个元素。然后,应出现一个对话框通知用户文件已保存。当对话框关闭时,它应该清空文本字段和
我需要运行一个名为ArrayHolder的java程序,它将运行两个线程。 ArrayHolder 将有一个 Array。 ThreadSeven 会用 7 覆盖该 Array 的每个元素,并用 1
在我的程序中,应该读取学生姓名、ID 号和 GPA,将其分配给指定的学生,然后打印出来。一切都编译正常,但出现错误 Error: Could not find or load main class L
我是一名优秀的程序员,十分优秀!