- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数 据流进行状态计算。
有界流:相对于离线数据集
无界流:相对于实时数据
越顶层越抽象,表达含义越简明,使用越方便
越底层越具体,表达能力越丰富,使用越灵活
事件时间:数据产生的时间
处理时间:程序处理数据的时间
Flink是真正意义上的流式计算框架,基本数据模式是数据流,以及事件序列。
SparkStreaming是微批次的,通常都要设置批次大小,几百毫秒或者几秒,这一小批数据是 RDD集合,并且DAG引擎把job分为不同的Stage。
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class DataSetWordcount {
public static void main(String[] args) throws Exception {
// 1、创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2、读取数据
String path = "D:\\project\\flink\\src\\main\\resources\\wordcount.txt";
// DataSet -> Operator -> DataSource
DataSet<String> inputDataSet = env.readTextFile(path);
// 3、扁平化 + 分组 + sum
DataSet<Tuple2<String, Integer>> resultDataSet = inputDataSet.flatMap(new MyFlatMapFunction())
.groupBy(0) // (word, 1) -> 0 表示 word
.sum(1);
resultDataSet.print();
}
public static class MyFlatMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
在192.168.200.102 主机启动 nc -lk 9999
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordcount {
public static void main(String[] args) throws Exception {
// 1、创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、读取 socket 数据
DataStreamSource<String> inputDataStream = env.socketTextStream("192.168.200.102", 9999);
// 3、计算
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}).keyBy(0)
.sum(1);
// 4、输出
resultDataStream.print();
// 5、启动 env
env.execute();
}
}
这个问题在这里已经有了答案: Oracle: merging two different queries into one, LIKE & IN (1 个回答) 8年前关闭。 我有以下代码: case
我查阅过此页面:http://dev.mysql.com/doc/refman/5.1/en/case.html以及这个,但无法获得一个简单的程序来工作...... 更新:为了明确我想要做什么:我想从
有什么办法可以优化下面的查询吗? SELECT DATE_FORMAT(a.duedate,'%d-%b-%y') AS dte, duedate, SUM(CASE WHEN (typeofnoti
我进退两难,以下 SQL 查询的结果是什么以及它是如何工作的: SELECT ... CASE WHEN (a.FIELD=1 AND b.FIELD=2) THEN 1 WHEN
问题:输入年,月,打印对应年月的日历。 示例: 问题分析: 1,首先1970年是Unix系统诞生的时间,1970年成为Unix的元年,1970年1月1号是星期四,现在大多的手机的日历功能只能显
**摘要:**介绍了Angular中依赖注入是如何查找依赖,如何配置提供商,如何用限定和过滤作用的装饰器拿到想要的实例,进一步通过N个案例分析如何结合依赖注入的知识点来解决开发编程中会遇到的问题。 本
我想拥有自动伴侣类apply case 类的构造函数来为我执行隐式转换,但无法弄清楚如何这样做。我到处搜索,我能找到的最接近的答案是 this问题(我将解释为什么它不是我在下面寻找的)。 我有一个看起
您好,我已经浏览了“多列案例”问题,但没有看到与此相同的内容,所以我想我应该问一下。 基本上我有两个我想要连接的表(都是子查询的结果)。它们具有相同的列名称。如果我加入他们的 ID 和 SELECT
我发现了一些类型推断的非直觉行为。因此,语义等效代码的工作方式不同,具体取决于编译器推断出的有关函数返回类型的信息。当您在最小单元测试中重现此案例时,或多或少会清楚发生了什么。但我担心在编写框架代码时
CREATE TABLE test ( sts_id int , [status1] int , [status2] int , [status3] int , [status4] int ) INS
我有以下声明: SELECT Dag AS Dag, CASE Jaar WHEN 2013 THEN Levering END AS '2013', CASE
我想做的是为所有高于平均时间、平均时间和低于平均时间的游乐设施获取平均tip_portion。所以返回3行。当我运行它时,它显示: ERROR: missing FROM-clause entry
我正在尝试设置一个包含以下字段的报告: 非常需要报告来显示日期、该日期内的总记录(因此我按日期分组),然后按小时计算 12 小时工作日(从上午 8 点到晚上 8 点)我需要计算记录在这些时间内出现的时
我有这个查询 SELECT users.name FROM users LEFT JOIN weapon_stats ON users.id = weapon_stats.zp_id WHERE we
我正在尝试按收视率等级获取不同视频的计数。我有下表: vid_id views 1 6 1 10 1 900 2 850 2 125000
假设我有一个如下所示的 SQL 语句: select supplier, case when platform in (5,6) then 'mobile' when p
我有一个表测试 TestNumber (int primary key) InactiveBitwise (int) 我执行以下命令: UPDATE tests SET CASE WH
我有一个像这样的表(name=expense): id amount date 1 -1687 2014-01-02 00:00:00.0 2 11000 2014-01-02 0
我有一个 multimap 定义 typedef std::pair au_pair; //vertices typedef std::pair acq_pair; //ch qlty specifi
我有一个有点像枚举的类,它的每个实例都有一个唯一的 int 值,该值从 0 开始并在每个新实例时递增。 class MyEnumLikeClass { static int NextId =
我是一名优秀的程序员,十分优秀!