- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章nginx lua集成kafka的实现方法由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
第一步:进入opresty目录 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
[root@node03 openresty]
# cd /export/servers/openresty/
[root@node03 openresty]
# ll
total 356
drwxr-xr-x 2 root root 4096 Jul 26 11:33 bin
drwxrwxr-x 44 1000 1000 4096 Jul 26 11:31 build
drwxrwxr-x 43 1000 1000 4096 Nov 13 2017 bundle
-rwxrwxr-x 1 1000 1000 45908 Nov 13 2017 configure
-rw-rw-r-- 1 1000 1000 22924 Nov 13 2017 COPYRIGHT
drwxr-xr-x 6 root root 4096 Jul 26 11:33 luajit
drwxr-xr-x 6 root root 4096 Aug 1 08:14 lualib
-rw-r--r-- 1 root root 5413 Jul 26 11:32 Makefile
drwxr-xr-x 11 root root 4096 Jul 26 11:35 nginx
drwxrwxr-x 2 1000 1000 4096 Nov 13 2017 patches
drwxr-xr-x 44 root root 4096 Jul 26 11:33 pod
-rw-rw-r-- 1 1000 1000 3689 Nov 13 2017 README.markdown
-rw-rw-r-- 1 1000 1000 8690 Nov 13 2017 README-win32.txt
-rw-r--r-- 1 root root 218352 Jul 26 11:33 resty.index
drwxr-xr-x 5 root root 4096 Jul 26 11:33 site
drwxr-xr-x 2 root root 4096 Aug 1 10:54 testlua
drwxrwxr-x 2 1000 1000 4096 Nov 13 2017 util
[root@node03 openresty]
#
|
说明:接下来我们关注两个目录 lualib 和 nginx 。
1.lualib: 是存放opresty所需要的集成软件包的 。
2.nginx: 是nginx服务目录 。
接下来,我们进入lualib目录一看究竟:
1
2
3
4
5
6
7
8
|
[root@node03 openresty]
# cd lualib/
[root@node03 lualib]
# ll
total 116
-rwxr-xr-x 1 root root 101809 Jul 26 11:33 cjson.so
drwxr-xr-x 3 root root 4096 Jul 26 11:33 ngx
drwxr-xr-x 2 root root 4096 Jul 26 11:33 rds
drwxr-xr-x 2 root root 4096 Jul 26 11:33 redis
drwxr-xr-x 9 root root 4096 Aug 1 10:34 resty
|
这里我们看到了redis和ngx集成软件包,说明我们可以之间使用nginx和redis而无需导入任何依赖包!!!! 。
下面看看resty里面有些说明呢????
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
[root@node03 lualib]
# cd resty/
[root@node03 resty]
# ll
total 152
-rw-r--r-- 1 root root 6409 Jul 26 11:33 aes.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 core
-rw-r--r-- 1 root root 596 Jul 26 11:33 core.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 dns
drwxr-xr-x 2 root root 4096 Aug 1 10:42 kafka
#这是我们自己导入的
drwxr-xr-x 2 root root 4096 Jul 26 11:33 limit
-rw-r--r-- 1 root root 4616 Jul 26 11:33 lock.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 lrucache
-rw-r--r-- 1 root root 4620 Jul 26 11:33 lrucache.lua
-rw-r--r-- 1 root root 1211 Jul 26 11:33 md5.lua
-rw-r--r-- 1 root root 14544 Jul 26 11:33 memcached.lua
-rw-r--r-- 1 root root 21577 Jul 26 11:33 mysql.lua
-rw-r--r-- 1 root root 616 Jul 26 11:33 random.lua
-rw-r--r-- 1 root root 9227 Jul 26 11:33 redis.lua
-rw-r--r-- 1 root root 1192 Jul 26 11:33 sha1.lua
-rw-r--r-- 1 root root 1045 Jul 26 11:33 sha224.lua
-rw-r--r-- 1 root root 1221 Jul 26 11:33 sha256.lua
-rw-r--r-- 1 root root 1045 Jul 26 11:33 sha384.lua
-rw-r--r-- 1 root root 1359 Jul 26 11:33 sha512.lua
-rw-r--r-- 1 root root 236 Jul 26 11:33 sha.lua
-rw-r--r-- 1 root root 698 Jul 26 11:33 string.lua
-rw-r--r-- 1 root root 5178 Jul 26 11:33 upload.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 upstream
drwxr-xr-x 2 root root 406 Jul 26 11:33 websocket
|
这里我们看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管 。
注意:这里的 kafka 这个包是没有的,说明opnresty么有集成kafka。此处我已经提前导入啦kafka集成包 。
我们看看kafka里面多有哪些包:
1
2
3
4
5
6
7
8
9
10
11
|
[root@node03 resty]
# cd kafka
[root@node03 kafka]
# ll
total 48
-rw-r--r-- 1 root root 1369 Aug 1 10:42 broker.lua
-rw-r--r-- 1 root root 5537 Aug 1 10:42 client.lua
-rw-r--r-- 1 root root 710 Aug 1 10:42 errors.lua
-rw-r--r-- 1 root root 10718 Aug 1 10:42 producer.lua
-rw-r--r-- 1 root root 4072 Aug 1 10:42 request.lua
-rw-r--r-- 1 root root 2118 Aug 1 10:42 response.lua
-rw-r--r-- 1 root root 1494 Aug 1 10:42 ringbuffer.lua
-rw-r--r-- 1 root root 4845 Aug 1 10:42 sendbuffer.lua
|
附上 kafka 集成包:kafka.rar 。
第二步:创建kafka测试lua文件 。
1.退回到openresty 。
1
|
[root@node03 kafka]
# cd /export/servers/openresty/
|
2.创建测试文件 。
1
2
|
[root@node03 openresty]
# mkdir -r testlua
#这里文件名自己取,文件位置自己定,但必须找得到
|
这里文件名自己取,文件位置自己定,但必须找得到!!!!!!!!!!!下面会用到!!!!!!!!!.
3.进入刚刚创建的文件夹并创建kafkalua.lua脚本文件 。
创建文件:vim kafkalua.lua或者touch kafkalua.lua 。
1
2
3
4
|
[root@node03 openresty]
# cd testlua/
[root@node03 testlua]
# ll
total 8
-rw-r--r-- 1 root root 3288 Aug 1 10:54 kafkalua.lua
|
kafkalua.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
--测试语句可以不用
ngx.say('hello kafka file configuration successful!!!!!!')
--数据采集阈值限制,如果lua采集超过阈值,则不采集
local DEFAULT_THRESHOLD = 100000
-- kafka分区数
local PARTITION_NUM = 6
-- kafka主题名称
local TOPIC = 'B2CDATA_COLLECTION1'
-- 轮询器共享变量KEY值
local POLLING_KEY = "POLLING_KEY"
-- kafka集群(定义kafka broker地址,ip需要和kafka的host.name配置一致)
local function partitioner(key, num, correlation_id)
return tonumber(key)
end
--kafka broker列表
local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}}
--kafka参数,
local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner }
-- 共享内存计数器,用于kafka轮询使用
local shared_data = ngx.shared.shared_data
local pollingVal = shared_data:get(POLLING_KEY)
if not pollingVal then
pollingVal = 1
shared_data:set(POLLING_KEY, pollingVal)
end
--获取每一条消息的计数器,对PARTITION_NUM取余数,均衡分区
local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM)
shared_data:incr(POLLING_KEY, 1)
-- 并发控制
local isGone = true
--获取ngx.var.connections_active进行过载保护,即如果当前活跃连接数超过阈值进行限流保护
if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then
isGone = false
end
-- 数据采集
if isGone then
local time_local = ngx.var.time_local
if time_local == nil then
time_local = ""
end
local request = ngx.var.request
if request == nil then
request = ""
end
local request_method = ngx.var.request_method
if request_method == nil then
request_method = ""
end
local content_type = ngx.var.content_type
if content_type == nil then
content_type = ""
end
ngx.req.read_body()
local request_body = ngx.var.request_body
if request_body == nil then
request_body = ""
end
local http_referer = ngx.var.http_referer
if http_referer == nil then
http_referer = ""
end
local remote_addr = ngx.var.remote_addr
if remote_addr == nil then
remote_addr = ""
end
local http_user_agent = ngx.var.http_user_agent
if http_user_agent == nil then
http_user_agent = ""
end
local time_iso8601 = ngx.var.time_iso8601
if time_iso8601 == nil then
time_iso8601 = ""
end
local server_addr = ngx.var.server_addr
if server_addr == nil then
server_addr = ""
end
local http_cookie = ngx.var.http_cookie
if http_cookie == nil then
http_cookie = ""
end
--封装数据
local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie;
--引入kafka的producer
local producer = require "resty.kafka.producer"
--创建producer
local bp = producer:new(BROKER_LIST, CONNECT_PARAMS)
--发送数据
local ok, err = bp:send(TOPIC, partitions, message)
--打印错误日志
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end
end
|
第三步:修改nginx配置文件nginx.conf 。
1.进入ngin/conf目录 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
[root@node03 openresty]
# cd /export/servers/openresty/nginx/conf/
[root@node03 conf]
# ll
total 76
-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf
-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf.default
-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params
-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params.default
-rw-r--r-- 1 root root 2837 Jul 26 11:33 koi-utf
-rw-r--r-- 1 root root 2223 Jul 26 11:33 koi-win
-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types
-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types.default
-rw-r--r-- 1 root root 3191 Aug 1 10:52 nginx.conf
-rw-r--r-- 1 root root 2656 Jul 26 11:33 nginx.conf.default
-rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params
-rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params.default
-rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params
-rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params.default
-rw-r--r-- 1 root root 3610 Jul 26 11:33 win-utf
|
2.修改nginx.conf 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
[root@node03 conf]
# vim nginx.conf
#1.说明找到第一个server
#2.在server上面添加两行代码如下
#3.在server里面添加kafka相关的代码如下
#------------------添加的代码---------------------------------------
#开启共享字典,设置内存大小为10M,供每个nginx的线程消费
lua_shared_dict shared_data 10m;
#配置本地域名解析
resolver 127.0.0.1;
#------------------添加的代码---------------------------------------
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
root html;
index index.html index.htm;
}
#------------------添加的代码---------------------------------------
location
/kafkalua
{
#这里的kafkalua就是工程名字,不加默认为空
#开启nginx监控
stub_status on;
#加载lua文件
default_type text
/html
;
#指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!)
content_by_lua_file
/export/servers/openresty/testlua/kafkalua
.lua;
}
#------------------添加的代码---------------------------------------
}
|
说明:location /kafkalua{...}这里的kafkalua是工程名,可以随意取也可以不取,但是必须要记住!!! 。
看到我们上面配置了两个location,第一个为location /{...}第二个为location /kafkalua{...}那么他们有什么区别呢???先向下看,迷雾将会慢慢揭开.
第四步:启动nginx 。
1.进入nginx/sbin 。
1
2
3
4
|
[root@node03 sbin]
# cd /export/servers/openresty/nginx/sbin/
[root@node03 sbin]
# ll
total 16356
-rwxr-xr-x 1 root root 16745834 Jul 26 11:33 nginx
|
2.测试配置文件是否正确 。
1
2
3
4
|
[root@node03 sbin]
# nginx -t
nginx: the configuration
file
/export/servers/openresty/nginx/conf/nginx
.conf syntax is ok
nginx: configuration
file
/export/servers/openresty/nginx/conf/nginx
.conf
test
is successful
#看到已经成功啦
|
3.启动nginx 。
1
2
|
[root@node03 sbin]
# nginx
#不显示任何东西一般是成功啦
|
4.查看nginx是否启动成功 。
1
2
3
4
5
6
|
[root@node03 sbin]
# ps -ef | grep nginx
root 3730 1 0 09:24 ? 00:00:00 nginx: master process nginx
nobody 3731 3730 0 09:24 ? 00:00:20 nginx: worker process is shutting down
nobody 5766 3730 0 12:17 ? 00:00:00 nginx: worker process
root 5824 3708 0 12:24 pts
/1
00:00:00
grep
nginx
<span class=
"hljs-comment"
>
#看到有两个nginx进程,表示成功le</span>
|
5.浏览器访问nginx 。
在浏览器输入:node03/kafkalua 。
说明:如何么有配置hosts则输入openresty所在设备的地址如:192.168.52.120/kafkalua 。
在浏览器输入:node03/或者 192.168.52.120/ 。
再在浏览器输入:node03:80/kafkalua 和 node03:80/试试 搬来nginx.conf来看看:
node03:80/kafkalua 这里的nide03是服务器的别名或者之间写文服务器地址,80是【listen 80;】配置的监听端口,80端口可以省略不写,如果这写成【listen 8088;】那么浏览器需输入 node03:8088/kafkalua (这里不能省略8088),kafkalua是工程名.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
root html;
index index.html index.htm;
}
#------------------添加的代码---------------------------------------
location
/kafkalua
{
#这里的kafkalua就是工程名字,不加默认为空
#开启nginx监控
stub_status on;
#加载lua文件
default_type text
/html
;
#指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!)
content_by_lua_file
/export/servers/openresty/testlua/kafkalua
.lua;
}
|
第五步:创建测试爬虫程序 。
1.创建maven工程导入依赖 。
1
2
3
4
5
6
7
8
9
10
11
12
|
<
dependencies
>
<
dependency
>
<
groupId
>org.jsoup</
groupId
>
<
artifactId
>jsoup</
artifactId
>
<
version
>1.11.3</
version
>
</
dependency
>
<
dependency
>
<
groupId
>org.apache.httpcomponents</
groupId
>
<
artifactId
>httpclient</
artifactId
>
<
version
>4.5.4</
version
>
</
dependency
>
</
dependencies
>
|
2.伪爬虫程序 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
|
public
class
SpiderGoAirCN {
private
static
String basePath =
"http://node03/kafkalua"
;
public
static
void
main(String[] args)
throws
Exception {
for
(
int
i =
0
; i <
50000
; i++) {
// 请求查询信息
spiderQueryao();
// 请求html
spiderHtml();
// 请求js
spiderJs();
// 请求css
spiderCss();
// 请求png
spiderPng();
// 请求jpg
spiderJpg();
Thread.sleep(
100
);
}
}
/**
*
* @throws Exception
*/
public
static
void
spiderQueryao()
throws
Exception {
// 1.指定目标网站 ^.*/B2C40/query/jaxb/direct/query.ao.*$
String url = basePath +
"/B2C40/query/jaxb/direct/query.ao"
;
// 2.发起请求
HttpPost httpPost =
new
HttpPost(url);
// 3. 设置请求参数
httpPost.setHeader(
"Time-Local"
, getLocalDateTime());
httpPost.setHeader(
"Requst"
,
"POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"
);
httpPost.setHeader(
"Request Method"
,
"POST"
);
httpPost.setHeader(
"Content-Type"
,
"application/x-www-form-urlencoded; charset=UTF-8"
);
httpPost.setHeader(
"Referer"
,
"http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1="
+ getGoTime() +
"&at=1&ct=0&it=0"
);
httpPost.setHeader(
"Remote Address"
,
"192.168.56.80"
);
httpPost.setHeader(
"User-Agent"
,
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"
);
httpPost.setHeader(
"Time-Iso8601"
, getISO8601Timestamp());
httpPost.setHeader(
"Server Address"
,
"243.45.78.132"
);
httpPost.setHeader(
"Cookie"
,
"JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D"
+ getGoTime()
+
"%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1("
+ getGoTime() +
")"
);
// 4.设置请求参数
ArrayList<BasicNameValuePair> parameters =
new
ArrayList<BasicNameValuePair>();
parameters
.add(
new
BasicNameValuePair(
"json"
,
"{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"
));
httpPost.setEntity(
new
UrlEncodedFormEntity(parameters));
// 5. 发起请求
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(httpPost);
// 6.获取返回值
System.out.println(response !=
null
);
}
public
static
void
spiderHtml()
throws
Exception {
// 1.指定目标网站 ^.*html.*$
String url = basePath +
"/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=CTU&d1=2018-01-17&at=1&ct=0&it=0"
;
// 2.发起请求
HttpPost httpPost =
new
HttpPost(url);
// 3. 设置请求参数
httpPost.setHeader(
"Time-Local"
, getLocalDateTime());
httpPost.setHeader(
"Requst"
,
"POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"
);
httpPost.setHeader(
"Request Method"
,
"POST"
);
httpPost.setHeader(
"Content-Type"
,
"application/x-www-form-urlencoded; charset=UTF-8"
);
httpPost.setHeader(
"Referer"
,
"http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0"
);
httpPost.setHeader(
"Remote Address"
,
"192.168.56.1"
);
httpPost.setHeader(
"User-Agent"
,
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"
);
httpPost.setHeader(
"Time-Iso8601"
, getISO8601Timestamp());
httpPost.setHeader(
"Server Address"
,
"192.168.56.80"
);
httpPost.setHeader(
"Cookie"
,
"JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"
);
// 4.设置请求参数
// httpPost.setEntity(new StringEntity(
// "depcity=CAN&arrcity=WUH&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preUrl=&isMember="));
ArrayList<BasicNameValuePair> parameters =
new
ArrayList<BasicNameValuePair>();
parameters
.add(
new
BasicNameValuePair(
"json"
,
"{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"
));
httpPost.setEntity(
new
UrlEncodedFormEntity(parameters));
// 5. 发起请求
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(httpPost);
// 6.获取返回值
System.out.println(response !=
null
);
}
public
static
void
spiderJs()
throws
Exception {
// 1.指定目标网站
String url = basePath +
"/B2C40/dist/main/modules/common/requireConfig.js"
;
// 2.发起请求
HttpPost httpPost =
new
HttpPost(url);
// 3. 设置请求参数
httpPost.setHeader(
"Time-Local"
, getLocalDateTime());
httpPost.setHeader(
"Requst"
,
"POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"
);
httpPost.setHeader(
"Request Method"
,
"POST"
);
httpPost.setHeader(
"Content-Type"
,
"application/x-www-form-urlencoded; charset=UTF-8"
);
httpPost.setHeader(
"Referer"
,
"http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0"
);
httpPost.setHeader(
"Remote Address"
,
"192.168.56.1"
);
httpPost.setHeader(
"User-Agent"
,
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"
);
httpPost.setHeader(
"Time-Iso8601"
, getISO8601Timestamp());
httpPost.setHeader(
"Server Address"
,
"192.168.56.80"
);
httpPost.setHeader(
"Cookie"
,
"JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"
);
// 4.设置请求参数
ArrayList<BasicNameValuePair> parameters =
new
ArrayList<BasicNameValuePair>();
parameters
.add(
new
BasicNameValuePair(
"json"
,
"{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"
));
httpPost.setEntity(
new
UrlEncodedFormEntity(parameters));
// 5. 发起请求
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(httpPost);
// 6.获取返回值
System.out.println(response !=
null
);
}
public
static
void
spiderCss()
throws
Exception {
// 1.指定目标网站
String url = basePath +
"/B2C40/dist/main/css/flight.css"
;
// 2.发起请求
HttpPost httpPost =
new
HttpPost(url);
// 3. 设置请求参数
httpPost.setHeader(
"Time-Local"
, getLocalDateTime());
httpPost.setHeader(
"Requst"
,
"POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"
);
httpPost.setHeader(
"Request Method"
,
"POST"
);
httpPost.setHeader(
"Content-Type"
,
"application/x-www-form-urlencoded; charset=UTF-8"
);
httpPost.setHeader(
"Referer"
,
"http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html"
);
httpPost.setHeader(
"Remote Address"
,
"192.168.56.1"
);
httpPost.setHeader(
"User-Agent"
,
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"
);
httpPost.setHeader(
"Time-Iso8601"
, getISO8601Timestamp());
httpPost.setHeader(
"Server Address"
,
"192.168.56.80"
);
httpPost.setHeader(
"Cookie"
,
"JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"
);
// 4.设置请求参数
ArrayList<BasicNameValuePair> parameters =
new
ArrayList<BasicNameValuePair>();
parameters
.add(
new
BasicNameValuePair(
"json"
,
"{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"
));
httpPost.setEntity(
new
UrlEncodedFormEntity(parameters));
// 5. 发起请求
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(httpPost);
// 6.获取返回值
System.out.println(response !=
null
);
}
public
static
void
spiderPng()
throws
Exception {
// 1.指定目标网站
String url =basePath +
"/B2C40/dist/main/images/common.png"
;
// 2.发起请求
HttpPost httpPost =
new
HttpPost(url);
// 3. 设置请求参数
httpPost.setHeader(
"Time-Local"
, getLocalDateTime());
httpPost.setHeader(
"Requst"
,
"POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"
);
httpPost.setHeader(
"Request Method"
,
"POST"
);
httpPost.setHeader(
"Content-Type"
,
"application/x-www-form-urlencoded; charset=UTF-8"
);
httpPost.setHeader(
"Referer"
,
"http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0"
);
httpPost.setHeader(
"Remote Address"
,
"192.168.56.1"
);
httpPost.setHeader(
"User-Agent"
,
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"
);
httpPost.setHeader(
"Time-Iso8601"
, getISO8601Timestamp());
httpPost.setHeader(
"Server Address"
,
"192.168.56.80"
);
httpPost.setHeader(
"Cookie"
,
"JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"
);
// 4.设置请求参数
ArrayList<BasicNameValuePair> parameters =
new
ArrayList<BasicNameValuePair>();
parameters
.add(
new
BasicNameValuePair(
"json"
,
"{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"
));
httpPost.setEntity(
new
UrlEncodedFormEntity(parameters));
// 5. 发起请求
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(httpPost);
// 6.获取返回值
System.out.println(response !=
null
);
}
public
static
void
spiderJpg()
throws
Exception {
// 1.指定目标网站
String url = basePath +
"/B2C40/dist/main/images/loadingimg.jpg"
;
// 2.发起请求
HttpPost httpPost =
new
HttpPost(url);
// 3. 设置请求参数
httpPost.setHeader(
"Time-Local"
, getLocalDateTime());
httpPost.setHeader(
"Requst"
,
"POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1"
);
httpPost.setHeader(
"Request Method"
,
"POST"
);
httpPost.setHeader(
"Content-Type"
,
"application/x-www-form-urlencoded; charset=UTF-8"
);
httpPost.setHeader(
"Referer"
,
"http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0"
);
httpPost.setHeader(
"Remote Address"
,
"192.168.56.1"
);
httpPost.setHeader(
"User-Agent"
,
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"
);
httpPost.setHeader(
"Time-Iso8601"
, getISO8601Timestamp());
httpPost.setHeader(
"Server Address"
,
"192.168.56.80"
);
httpPost.setHeader(
"Cookie"
,
"JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)"
);
// 4.设置请求参数
ArrayList<BasicNameValuePair> parameters =
new
ArrayList<BasicNameValuePair>();
parameters
.add(
new
BasicNameValuePair(
"json"
,
"{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"
));
httpPost.setEntity(
new
UrlEncodedFormEntity(parameters));
// 5. 发起请求
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(httpPost);
// 6.获取返回值
System.out.println(response !=
null
);
}
public
static
String getLocalDateTime() {
DateFormat df =
new
SimpleDateFormat(
"dd/MMM/yyyy'T'HH:mm:ss +08:00"
,
Locale.ENGLISH);
String nowAsISO = df.format(
new
Date());
return
nowAsISO;
}
public
static
String getISO8601Timestamp() {
DateFormat df =
new
SimpleDateFormat(
"yyyy-MM-dd'T'HH:mm:ss+08:00"
);
String nowAsISO = df.format(
new
Date());
return
nowAsISO;
}
public
static
String getGoTime() {
DateFormat df =
new
SimpleDateFormat(
"yyyy-MM-dd"
);
String nowAsISO = df.format(
new
Date());
return
nowAsISO;
}
public
static
String getBackTime() {
Date date =
new
Date();
// 取时间
Calendar calendar =
new
GregorianCalendar();
calendar.setTime(date);
calendar.add(calendar.DATE, +
1
);
// 把日期往前减少一天,若想把日期向后推一天则将负数改为正数
date = calendar.getTime();
SimpleDateFormat formatter =
new
SimpleDateFormat(
"yyyy-MM-dd"
);
String dateString = formatter.format(date);
return
dateString;
}
}
|
第六步:启动kafka 。
1.创建主题topic 。
1
2
|
[root@node01 bin]
# kafka-topics.sh --zookeeper node01:2181 --partitions 3
--replication-factor 3 --create --topic B2CDATA_COLLECTION1
|
2.开启kafka消费者 。
1
2
|
[root@node01 bin]
# kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092
--topic B2CDATA_COLLECTION1
|
第七步:开启爬虫程序并观察结果 。
1.启动爬虫程序 。
2.观察消费者窗口如下 。
第八步:启动kafka-manager观察 。
1.启动kafka-manager 。
1
2
3
4
5
6
7
8
9
10
11
|
[root@node01 conf]
# cd /export/servers/kafka-manager-1.3.3.23/bin/
[root@node01 bin]
# ll
total 36
-rwxr-xr-x 1 root root 13747 May 1 06:27 kafka-manager
-rw-r--r-- 1 root root 9975 May 1 06:27 kafka-manager.bat
-rwxr-xr-x 1 root root 1383 May 1 06:27 log-config
-rw-r--r-- 1 root root 105 May 1 06:27 log-config.bat
[root@node01 bin]
#
#启动
[root@node01 bin]
# ./kafka-manager
|
启动后的窗口:
2.浏览器访问 。
浏览器输入:node01:9000 。
kafka manager使用不做讲解,观察B2CDATA_COLLECTION1主题消费情况:
有三个分区,每个分区消费的消息差多说明成功啦, 。
如果不一样,则是kafkalua.lua 脚本中没有配置分区策略,默认分区会导致 数据倾斜 我们需配置自己的分区策略! 。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.
原文链接:https://www.cnblogs.com/-xiaoyu-/p/11294905.html 。
最后此篇关于nginx lua集成kafka的实现方法的文章就讲到这里了,如果你想了解更多关于nginx lua集成kafka的实现方法的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我想了解 Ruby 方法 methods() 是如何工作的。 我尝试使用“ruby 方法”在 Google 上搜索,但这不是我需要的。 我也看过 ruby-doc.org,但我没有找到这种方法。
Test 方法 对指定的字符串执行一个正则表达式搜索,并返回一个 Boolean 值指示是否找到匹配的模式。 object.Test(string) 参数 object 必选项。总是一个
Replace 方法 替换在正则表达式查找中找到的文本。 object.Replace(string1, string2) 参数 object 必选项。总是一个 RegExp 对象的名称。
Raise 方法 生成运行时错误 object.Raise(number, source, description, helpfile, helpcontext) 参数 object 应为
Execute 方法 对指定的字符串执行正则表达式搜索。 object.Execute(string) 参数 object 必选项。总是一个 RegExp 对象的名称。 string
Clear 方法 清除 Err 对象的所有属性设置。 object.Clear object 应为 Err 对象的名称。 说明 在错误处理后,使用 Clear 显式地清除 Err 对象。此
CopyFile 方法 将一个或多个文件从某位置复制到另一位置。 object.CopyFile source, destination[, overwrite] 参数 object 必选
Copy 方法 将指定的文件或文件夹从某位置复制到另一位置。 object.Copy destination[, overwrite] 参数 object 必选项。应为 File 或 F
Close 方法 关闭打开的 TextStream 文件。 object.Close object 应为 TextStream 对象的名称。 说明 下面例子举例说明如何使用 Close 方
BuildPath 方法 向现有路径后添加名称。 object.BuildPath(path, name) 参数 object 必选项。应为 FileSystemObject 对象的名称
GetFolder 方法 返回与指定的路径中某文件夹相应的 Folder 对象。 object.GetFolder(folderspec) 参数 object 必选项。应为 FileSy
GetFileName 方法 返回指定路径(不是指定驱动器路径部分)的最后一个文件或文件夹。 object.GetFileName(pathspec) 参数 object 必选项。应为
GetFile 方法 返回与指定路径中某文件相应的 File 对象。 object.GetFile(filespec) 参数 object 必选项。应为 FileSystemObject
GetExtensionName 方法 返回字符串,该字符串包含路径最后一个组成部分的扩展名。 object.GetExtensionName(path) 参数 object 必选项。应
GetDriveName 方法 返回包含指定路径中驱动器名的字符串。 object.GetDriveName(path) 参数 object 必选项。应为 FileSystemObjec
GetDrive 方法 返回与指定的路径中驱动器相对应的 Drive 对象。 object.GetDrive drivespec 参数 object 必选项。应为 FileSystemO
GetBaseName 方法 返回字符串,其中包含文件的基本名 (不带扩展名), 或者提供的路径说明中的文件夹。 object.GetBaseName(path) 参数 object 必
GetAbsolutePathName 方法 从提供的指定路径中返回完整且含义明确的路径。 object.GetAbsolutePathName(pathspec) 参数 object
FolderExists 方法 如果指定的文件夹存在,则返回 True;否则返回 False。 object.FolderExists(folderspec) 参数 object 必选项
FileExists 方法 如果指定的文件存在返回 True;否则返回 False。 object.FileExists(filespec) 参数 object 必选项。应为 FileS
我是一名优秀的程序员,十分优秀!