- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
因为最近的需求是做FlinkSQL平台,需要在实时计算平台上集成FlinkSQL功能,但目前刚刚有了研究成果,所以有了这篇笔记。
这里使用python编写的一个流,比Java简洁。
需要注意的是 pip install kakfa-python,不能是 pip install kafka。
这里生产的集群是SCRAM加密的,所以配置会多一些。
有一个单词本,words.txt就是一些英文单词,一行一个。
这个Producer每5秒产生一个记录,以JSON形式发布到流。
from kafka import KafkaProducer
import json
import random
import time
import sys
if __name__ == '__main__':
producer = KafkaProducer(
bootstrap_servers="kafka1211.slannka.com:9194",
key_serializer=lambda v: str.encode if v is not None else None,
value_serializer=lambda v: v.encode('utf-8') if v is not None else None,
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-256",
sasl_plain_username="slankkaCopyrightReserved",
sasl_plain_password="passwordOfUsername",
api_version=(2, 2, 1)
)
count = 0
thefile = open("data/words.txt", "rb")
while True:
buffer = thefile.read(1024 * 8192)
if not buffer:
break
count += buffer.count('\n'.encode())
thefile.close()
textfile = open("data/words.txt", "r")
lines = textfile.readlines() 读取全部内容 ,并以列表方式返回
while True:
initial values in each loop
offset = 0
word = None
get a random value represents a word
randint = random.randint(0, count)
print("total: ", count, ", randInt: ", randint)
for line in lines:
if offset == randint:
word = line.strip()
break
offset += 1
val = {
"word": word,
"len": len(word)
}
value = json.dumps(val)
print("sending:", value)
producer.send("test_enc_putong", value)
print("send finished..(wait 5s.)")
time.sleep(5.0)
producer.close(3000)
textfile.close()
create table WordCountTab (
word STRING,
len INT,
ts TIMESTAMP(3) METADATA FROM 'timestamp'这一行不支持则可以去掉
) with (
'connector' = 'kafka',
'topic' = 'test_slankka',
'properties.bootstrap.servers' = 'xxxxx.xxxxx.xxxxxx.com:9194',
'properties.group.id' = 'test_flinksql_consumer',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'properties.sasl.jaas.config'= 'org.apache.kafka.common.security.scram.ScramLoginModule required username="slankkaCopyrightReserved" password="passwordOfUsername";',
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
'properties.security.protocol' = 'SASL_PLAINTEXT'
);
create table WordCountSink (
word STRING,
len INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql1211.slankkaCorps.com:3306/rtc',
'table-name' = 'flink_sink_test',
'username' = 'root',
'password' = 'root'
);
INSERT INTO WordCountSink
SELECT word, len FROM WordCountTab;
执行即可,生成一个Flink JOB,这个任务会不断得写<word,len>到Mysql中。
我希望使用 API 根据处理 Q 的大小更改运行的 Web 作业实例的数量,我知道我可以在门户中设置规则,但最短聚合时间为 60 分钟,并且我如果我们突然遇到大量工作,不希望系统在扩展之前等待 60
假设我有一个 spark 应用程序并且有两个操作导致两个 spark 作业。 //spark Application //Spark Job1 .... erro
大家好! 作为我对Java的自学的一部分,我正在尝试完成可用的Java初学者分配之一here(非常古老的东西-2001) 问题是我不知道如何应对这个挑战:(我将不胜感激任何建议,因为该解决方案不再可用
我一直在使用 HADOOP 1.2.1 服务器,并在那里执行许多 pig 作业。最近,我考虑将我的 Hadoop 服务器更改为 HADOOP 2.2.0。所以我在 HADOOP 2.2.0 中尝试了一
好的,我修复了静态错误。现在我只是想找出为什么每个对象都得到相同的条目(即相同的名字、年龄、体重等)。这是代码: package classlab3b; import classlab3B.BodyM
我的家庭作业中的一个问题需要一些帮助,我已经尝试了大约一个小时,但无法运行。 列出购买商品数量超过每位顾客平均商品数量的顾客 表格如下: Customer(Cnum, CustomerName, Ad
Kubernetes Jobs重复创建 Pod,直到指定数量的容器成功终止。作业通常与更高级别的CronJob机制一起使用,该机制会按循环计划自动启动新作业。 定期使用 Jobs 和 CronJobs
我有以下工作类(我已经删除了实际的工作代码): @On("0 0 1 * * ?") public class DailyJob extends Job { @Override pub
假设您将 cron 作业配置为每分钟运行一次以做某事。如果实际任务运行时间超过一分钟会发生什么? cron 会创建另一个作业实例/线程吗?还是 cron 会等待并确保上一次运行完成? 谢谢! 最佳答案
我们正在使用 TeamCity 7 并想知道是否可以仅在前一个步骤失败时才运行步骤?我们在构建步骤配置中的选项让您可以选择仅在所有步骤都成功时执行,即使步骤失败,或者始终运行它。 有没有办法仅在前一个
我在 oracle 中编写作业以执行存储过程,但是当时机成熟时,它不会无缘无故地发生任何事情。 是否有某种日志可以让我查看是否发生了错误或其他事情? 我使用 dbms_job 包来创建作业 恩克斯。
我正在用 Java 创建一个用于文件共享的 p2p 应用程序。每个对等节点都将在我的机器上的不同端口上运行并监听请求。但我遇到的问题是,当创建 PeerNode 实例时,我的代码会进入无限循环。以下是
我正在尝试创建一个队列,但当我运行 php artisanqueue:work 时它不起作用,我在终端中得到的只是 [2017-11-30 19:56:27] Processing: App\Jobs
我正在使用PHP库phpseclib0.2.2将SSH自动化到我的一台服务器中。我将其设置为每5分钟运行一次的cron任务。 在设置完它并确保其运行等情况下注销后,我看到了以下内容: $ logout
有没有办法获取多分支管道作业扫描收集到的所有分支的名称? 我想设置一个依赖于现有构建作业的夜间构建,因此需要检查多分支作业是否包含某些特定分支。另一种方法是检查现有作业。 最佳答案 我通过使用 Jen
我在编程方面还很陌生,我不太确定如何完成分配给我的学校作业。 Write a function void print_min(unsigned char a, short b,int c),which
我的作业有问题,需要帮助! 问题 1: 完成下面的 Java 方法,以便 raiseToPower(x,n) 将数字 x 提高到整数 n 次方(即计算值 xn )。请记住 x-n = 1/xn,x0
我正在做一项家庭作业,该作业有四个文本字段和一个文本区域,以及一个将文本字段和文本区域保存到文本文件的按钮,每行一个元素。然后,应出现一个对话框通知用户文件已保存。当对话框关闭时,它应该清空文本字段和
我需要运行一个名为ArrayHolder的java程序,它将运行两个线程。 ArrayHolder 将有一个 Array。 ThreadSeven 会用 7 覆盖该 Array 的每个元素,并用 1
在我的程序中,应该读取学生姓名、ID 号和 GPA,将其分配给指定的学生,然后打印出来。一切都编译正常,但出现错误 Error: Could not find or load main class L
我是一名优秀的程序员,十分优秀!