python3操作消息队列中间件Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消费。

特性:

通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

高吞吐量[2]  :即使是非常普通的硬件Kafka也可以支持每秒数百万[2]  的消息

支持通过Kafka服务器和消费机集群来分区消息

支持Hadoop并行数据加载

术语:

Broker

Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

Partition

Partition是物理上的概念,每个Topic包含一个或多个Partition.

Producer

负责发布消息到Kafka broker

Consumer

消息消费者,向Kafka broker读取消息的客户端。

Consumer Group

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

我们首先需要安装操作kafka的相关模块

pip install kafka
pip install kafka-python

1、生产者:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092']) #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
for i in range(3):
    msg = "msg%d" % i
    producer.send('test', msg)
producer.close()

2、消费者(简单demo):

from kafka import KafkaConsumer
consumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092'])                       
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

启动后生产者、消费者可以正常消费。

3、消费者(消费群组)

from kafka import KafkaConsumer
consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['172.21.10.136:9092'])                      
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

启动多个消费者,只有其中可以消费到,满足要求,消费组可以横向扩展提高处理能力

4、消费者(读取目前最早可读的消息)

from kafka import KafkaConsumer
consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['172.21.10.136:9092'])                        
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest

源码定义:{'smallest': 'earliest', 'largest': 'latest'}

5、消费者(手动设置偏移量)

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test',bootstrap_servers=['172.21.10.136:9092'])
print consumer.partitions_for_topic("test")  #获取test主题的分区信息
print consumer.topics()  #获取主题列表
print consumer.subscription()  #获取当前消费者订阅的主题
print consumer.assignment()  #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=u'test', partition=0), 5)  #重置偏移量,从第5个偏移量消费
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

6、消费者(订阅多个主题)

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))  #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

7、消费者(手动拉取消息)

from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
    msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
    print msg
    time.sleep(1)

8、消费者(消息挂起与恢复)

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
    print num
    print consumer.paused()   #获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print msg
    time.sleep(2)
    num = num + 1
    if num == 10:
        print "resume..."
        consumer.resume(TopicPartition(topic=u'test', partition=0))
        print "resume......"

pause执行后,consumer不能读取,直到调用resume后恢复。


内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://sulao.cn/post/619.html