gpt4 book ai didi

python操作kafka实践的示例代码

转载 作者:qq735679552 更新时间:2022-09-28 22:32:09 31 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章python操作kafka实践的示例代码由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

1、先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from kafka import kafkaproducer
 
producer = kafkaproducer(bootstrap_servers = 'xxxx:x' )
 
msg_dict = {
   "sleep_time" : 10 ,
   "db_config" : {
     "database" : "test_1" ,
     "host" : "xxxx" ,
     "user" : "root" ,
     "password" : "root"
   },
   "table" : "msg" ,
   "msg" : "hello world"
}
msg = json.dumps(msg_dict)
producer.send( 'test_rhj' , msg, partition = 0 )
producer.close()

下面是消费者的简单代码:

?
1
2
3
4
5
6
from kafka import kafkaconsumer
 
consumer = kafkaconsumer( 'test_rhj' , bootstrap_servers = [ 'xxxx:x' ])
for msg in consumer:
   recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
   print recv

下面是结果:

python操作kafka实践的示例代码

2、如果想要完成负载均衡,就需要知道kafka的分区机制,同一个主题,可以为其分区,在生产者不指定分区的情况,kafka会将多个消息分发到不同的分区,消费者订阅时候如果不指定服务组,会收到所有分区的消息,如果指定了服务组,则同一服务组的消费者会消费不同的分区,如果2个分区两个消费者的消费者组消费,则,每个消费者消费一个分区,如果有三个消费者的服务组,则会出现一个消费者消费不到数据;如果想要消费同一分区,则需要用不同的服务组。以此为原理,我们对消费者做如下修改:

?
1
2
3
4
5
6
from kafka import kafkaconsumer
 
consumer = kafkaconsumer( 'test_rhj' , bootstrap_servers = [ 'xxxx:x' ])
for msg in consumer:
   recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
   print recv

然后我们开两个消费者进行消费,生产者分别往0分区和1分区发消息结果如下,可以看到,一个消费者只能消费0分区,另一个只能消费1分区:

python操作kafka实践的示例代码

python操作kafka实践的示例代码

3、kafka提供了偏移量的概念,允许消费者根据偏移量消费之前遗漏的内容,这基于kafka名义上的全量存储,可以保留大量的历史数据,历史保存时间是可配置的,一般是7天,如果偏移量定位到了已删除的位置那也会有问题,但是这种情况可能很小;每个保存的数据文件都是以偏移量命名的,当前要查的偏移量减去文件名就是数据在该文件的相对位置。要指定偏移量消费数据,需要指定该消费者要消费的分区,否则代码会找不到分区而无法消费,代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
from kafka import kafkaconsumer
from kafka.structs import topicpartition
 
consumer = kafkaconsumer(group_id = '123456' , bootstrap_servers = [ '10.43.35.25:4531' ])
consumer.assign([topicpartition(topic = 'test_rhj' , partition = 0 ), topicpartition(topic = 'test_rhj' , partition = 1 )])
print consumer.partitions_for_topic( "test_rhj" ) # 获取test主题的分区信息
print consumer.assignment()
print consumer.beginning_offsets(consumer.assignment())
consumer.seek(topicpartition(topic = 'test_rhj' , partition = 0 ), 0 )
for msg in consumer:
   recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
   print recv

因为指定的便宜量为0,所以从一开始插入的数据都可以查到,而且因为指定了分区,指定的分区结果都可以消费,结果如下:

python操作kafka实践的示例代码

4、有时候,我们并不需要实时获取数据,因为这样可能会造成性能瓶颈,我们只需要定时去获取队列里的数据然后批量处理就可以,这种情况,我们可以选择主动拉取数据 。

?
1
2
3
4
5
6
7
8
9
10
11
12
from kafka import kafkaconsumer
import time
 
consumer = kafkaconsumer(group_id = '123456' , bootstrap_servers = [ '10.43.35.25:4531' ])
consumer.subscribe(topics = ( 'test_rhj' ,))
index = 0
while true:
   msg = consumer.poll(timeout_ms = 5 ) # 从kafka获取消息
   print msg
   time.sleep( 2 )
   index + = 1
   print '--------poll index is %s----------' % index

结果如下,可以看到,每次拉取到的都是前面生产的数据,可能是多条的列表,也可能没有数据,如果没有数据,则拉取到的为空:

python操作kafka实践的示例代码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我.

原文链接:https://www.cnblogs.com/small-office/p/9399907.html 。

最后此篇关于python操作kafka实践的示例代码的文章就讲到这里了,如果你想了解更多关于python操作kafka实践的示例代码的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com