- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
// 设置时间语议,并过滤其中的首页曝光数据
DataStream<AppLogBean> homeExposureStream = appExposureStream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<AppLogBean>(Time.seconds(0)) {
@Override
public long extractTimestamp(AppLogBean element) {
return element.getTime() * 1000;
}
})
.filter(new FilterFunction<AppLogBean>() {
@Override
public boolean filter(AppLogBean value) throws Exception {
return "home_exposure".equals(value.getTopic()) && StringUtils.isNotBlank(value.getScdata());
}
});
// 获取出其中的用户id
SingleOutputStreamOperator<Tuple2<String, String>> userIdStream = homeExposureStream
.map(new MapFunction<AppLogBean, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(AppLogBean appLogBean) throws Exception {
String resultUserId = "1";
JSONObject scdataJson = JSONObject.parseObject(appLogBean.getScdata());
String user_id = scdataJson.getString("user_id");
resultUserId = user_id;
return Tuple2.of("dummy", resultUserId);
}
});
// 对用户id开窗,并统计每天的数据
SingleOutputStreamOperator<String> result = userIdStream
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(1)))
.aggregate(new UniqueVisitorAggregateFunction(), new UniqueVisitorProcessWindowFunction());
// 使用print打印数据
result.print("result>>>>>>>>>");
}
/**
* UV的窗口类
*/
public static class UniqueVisitorProcessWindowFunction extends ProcessWindowFunction<Long, String, String, TimeWindow> {
private final FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
@Override
public void process(String s, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
System.out.println("##### 当前的watermark为" + df.format(context.currentWatermark()));
System.out.println("##### 窗口开始时间######" + df.format(context.window().getStart()));
System.out.println("##### 窗口结束时间######" + df.format(context.window().getEnd()));
System.out.println("##### 该窗口当前统计的UV" + elements.iterator().next());
out.collect("UV " + elements.iterator().next());
}
}
/**
* UV的聚合类
*/
public static class UniqueVisitorAggregateFunction implements AggregateFunction<Tuple2<String, String>, Tuple2<Set<String>, Long>, Long> {
@Override
public Tuple2<Set<String>, Long> createAccumulator() {
return Tuple2.of(new HashSet<>(), 0L);
}
@Override
public Tuple2<Set<String>, Long> add(Tuple2<String, String> value, Tuple2<Set<String>, Long> accumulator) {
if (!accumulator.f0.contains(value.f1)) {
accumulator.f0.add(value.f1);
accumulator.f1 += 1;
}
return accumulator;
}
@Override
public Long getResult(Tuple2<Set<String>, Long> accumulator) {
return accumulator.f1;
}
@Override
public Tuple2<Set<String>, Long> merge(Tuple2<Set<String>, Long> a, Tuple2<Set<String>, Long> b) {
return null;
}
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val sensorStream: WindowedStream[SensorReading, String, TimeWindow] = env
.socketTextStream("localhost", 9999)
.map(new MyMapToSensorReading)
.keyBy(_.id)
.timeWindow(Time.seconds(5))
// 1、incremental aggregation functions(增量聚合函数)(来一条数据,计算一次)
// 1.1、ReduceFunction 增量集合函数(使用匿名内部类)
val reduceResult: DataStream[SensorReading] = sensorStream.reduce(new ReduceFunction[SensorReading] {
override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = {
SensorReading(value2.id, value2.timestamp, value2.temperature + value2.temperature)
}
})
// 1.2、AggregateFunction(相比reduce,优势是可以指定累加值类型,输入类型和输出类型也可以不一样)
val aggregateResult: DataStream[Long] = sensorStream.aggregate(new AggregateFunction[SensorReading, Long, Long] {
// 初始化累加值
override def createAccumulator(): Long = 0L
// 累加方法
override def add(value: SensorReading, accumulator: Long): Long = accumulator + 1
// 获取结果
override def getResult(accumulator: Long): Long = accumulator
// 分区的归并操作
override def merge(a: Long, b: Long): Long = a + b
})
// 2、full window functions(全窗口函数)
/**
* 知识点:
* 1、apply方法中,可以添加WindowFunction对象,会将该窗口中所有的数据先缓存,当时间到了一次性计算
* 2、需要设置4个类型,分别是:输入类型,输出类型,keyBy时key的类型(如果用字符串来划分key类型为Tuple,窗口类型
* 3、所有的计算都在apply中进行,可以通过window获取窗口的信息,比如开始时间,结束时间
*/
val applyResult: DataStream[(Long, Int)] = sensorStream.apply(new WindowFunction[SensorReading, (Long, Int), String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[SensorReading], out: Collector[(Long, Int)]): Unit = {
out.collect((window.getStart, input.size))
}
})
// 3、窗口函数中其他API
val otherResult: DataStream[SensorReading] = sensorStream
.allowedLateness(Time.seconds(1)) // 允许处理迟到的数据
.sideOutputLateData(new OutputTag[SensorReading]("late")) // 将迟到的数据放入侧输出流
.reduce((x, y) => SensorReading(y.id, y.timestamp, x.temperature + y.temperature))
// 获取侧输出流(侧输出流为迟到很久的数据,当allowedLateness和watermark之后还是没到的数据会放入侧输出流,可以在最后统一处理)
val sideOutputStream: DataStream[SensorReading] = otherResult.getSideOutput(new OutputTag[SensorReading]("late"))
// 打印输出
applyResult.print()
env.execute("WindowFunctionDemo")
最近做一个项目,由于是在别人框架里开发app,导致了很多限制,其中一个就是不能直接引用webservice 。 我们都知道,调用webserivice 最简单的方法就是在 "引用"
这是SDL2代码的一部分 SDL主函数 int main(int argc,char *argv[]) { ... ... bool quit=false; S
c 中的函数: PHPAPI char *php_pcre_replace(char *regex, int regex_len, ch
我有以下映射: public class SecurityMap : ClassMap { public SecurityMap() {
我在vue-lic3中使用了SCSS,但是有一个奇怪的错误,使用/ deep /会报告错误,我不想看到它。 代码运行环境 vue-cli3 + vant + scss 的CSS /deep/ .van
我在深入阅读 C# 时遇到了这个我能理解的内容: 当它被限制为引用类型时,执行的比较类型完全取决于类型参数被限制为什么。 但是不能理解这个: 如果进一步限制派生自重载 == 和 != 运算符的特定类型
Closed. This question is opinion-based。它当前不接受答案。 想改善这个问题吗?更新问题,以便editing this post用事实和引用来回答。 3年前关闭。
有人可以详细介绍关于自赋值的运算符重载中的 *this 和 const 例如: Class& Class::operator=(const Class& other) { a = other.
在向树中插入新节点时,如何填充闭包表的深度/长度列? ancestor 和 descendant 中的值是来自另一个表的 ID,表示要以树结构排列的页面。 关闭表: ancestor desce
现在我正在阅读“深入了解 C#”。缺少的一件事是完成一章后我可以解决的一系列问题。那会帮助我理解我刚刚学到的概念。 哪里可以找到适合 C#3.0 的问题集? 谢谢 最佳答案 你可以试试LINQ 101
TypeScript 给 JavaScript 扩展了类型的语法,我们可以给变量加上类型,在编译期间会做类型检查,配合编辑器还能做更准确的智能提示。此外,TypeScript 还支持了高级类型用
是否有一个单行代码来获取生成器并生成该生成器中的所有元素?例如: def Yearly(year): yield YEARLY_HEADER for month in range(1, 13)
所以我阅读了一些与“什么是方法组”相关的 StackOverflow 问题以及其他互联网文章,它们在底线都说了同样的话——方法组是“一组重载方法” ". 但是,在阅读 Jon Skeet 的“C# 深
有什么方法可以从子组件中获取子组件吗? 想象一下以下组件树: 应用程序 问题 问题选项(包含复选框) 问题选项(包含复选框) 问题选项(包含复选框) 我想从 App 访问问题选项以选中所有复选框。 参
class_eval 和 instance_eval 在定义方法等情况下是完全可以预测的。我也理解类的实例和类的单例(又名特征类)之间的区别。 但是 我无法弄清楚以下唯一的事情:比方说,出于某些策略目
我想出了如何将符号 rwx 部分读取/转换为 421 个八进制部分,这非常简单。但是当涉及到特殊字符时,我感到非常困惑。我们知道 -r-xr---wx 转换为 0543,但 -r-sr---wt 或
我怀疑我系统的 Java 版本有问题。某些应用程序出现段错误或内存不足或存在链接错误。如果我从源代码安装了 JDK,我会做类似“make test”的事情,看看哪些测试失败了。但是,看起来从源代码构建
如何克隆一个 repo(使用 libgit2 ) 我想做什么git clone确实,但有 libgit2 .我可能要问的是什么 git clone确实很深入。 这是我目前正在做的: 初始化一个repo
00、头痛的JS闭包、词法作用域? 被JavaScript的闭包、上下文、嵌套函数、this搞得很头痛,这语言设计的,感觉比较混乱,先勉强理解总结一下😂😂😂.
我开始玩 lubridate R 中的包。我注意到 now(tzone="EST")计算为: [1] "2015-08-25 13:01:08 EST" 而 now(tzone="PST")导致警告:
我是一名优秀的程序员,十分优秀!