python使用pykafka操作kafka

Kafka 是一种分布式的、分区的、多副本的基于发布/订阅的消息系统。它是通过 zookeeper 进行协调,常见可以用于 web/nginx 日志、访问日志、消息服务等。主要应用场景为:日志收集系统和消息系统。
Kafka 的主要设计目标如下:
1. 以时间复杂度为 O(1) 的方式提供持久化能力,即使对 TB 级别以上的数据也能保证常数时间的访问性能。
2. 高吞吐率,即使在十分廉价的机器上也能实现单机支持每秒 100K 条消息的传输。
3. 支持 Kafka Server (即 Kafka 集群的服务器)间的消息分区,及分布式消费,同时保证每个 partition 内的消息顺序传输。
4. 同时支持离线数据处理和实时数据处理

20170803141826479.png

如上图所示,一个 Kafka 集群由若干producer、若干consumer、若干broker,以及一个zookeeper集群所组成。Kafka通过zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。
Kafka相关名词解释

broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群,相当于物理层面上的一台服务器。
topic:存放同一类消息的位置,是一个概念层面上的名词,Kafka集群可以负责多个topic的分发。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
partition:topic在物理层面上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列,创建topic时可以指定partition数量,每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件。一般来说partition的数量大于等于broker的数量。
producer:负责发布消息到Kafka broker
consumer:消费消息,每个consumer属于一个特定的consumer group(可为每个consumer指定group name,若不指定group name则为默认的group)。使用consumer high level API时,同一topic的一条消息只能被一个consumer group的一个consumer消费,但多个consumer group可同时消费这条消息。
consumer group:每个consumer属于一个特定的consumer group,consumer group是实际记录的概念。

Kafka数据传输的事务特点
1. at most once
这种模式下consumer fetch消息,先进行commit,再进行处理。如果再处理消息的过程中出现异常,下次重新开始工作就无法读到之前已经确认而未处理的消息。
2. at least once
消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功。消费者fetch消息,然后处理消息,然后保存offset。如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是”at least once”,原因offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态。
3. exactly once
消息只会发送一次,Kafka中并没有严格的去实现,我们认为这种策略在Kafka中是没有必要的。
通常情况下,Kafka默认保证at least once。
4. Push & Pull
作为一个消息系统,Kafka遵循了传统的方式,选择由producer向broker push消息,并由consumer从broker中pull消息。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标就是以尽可能快的速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现是拒绝服务以及网络拥塞。而pull模式可以根据consumer的消费能力以适当的速率消费消息。
5. Topic & Partition
Topic在逻辑上可以认为是一个存在的queue,每条消息都必须指定它的topic,可以简单的理解为必须指明把这条消息放进哪个queue里。为了使Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。

每个日志文件都是"log entries"序列,每一个log entry包含一个4字节整型值(值为N),其后跟N个字节的消息体。每条消息都有一个当前partition下唯一的64字节的offset,它指明了这条消息的起始位置,也是对数据的唯一标识,Kafka中并没有提供额外的索引机制来存储offset,因为在Kafka中几乎不允许对消息进行"随机读写"。磁盘上log entry的存储格式如下:

message length:4 bytes
"magic" value:1 byte
crc:4 bytes
payload:n bytes

这个log entries并非由一个文件构成,而是分为多个segment,每个segment名为该segment第一条消息的offset和".kafka"组成。另外会有一个索引文件,它标明了每个segment下包含的log entry的offset范围。

wKiom1TG-fSDqT_xAAC5GU22wjw301.png

因为每条消息都被append到该partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘比随机写内存还要高,这是Kafka高吞吐率的一个保证)。
每条消息被发送到topic时,会根据指定的partition规则选择被存储到哪一个partition。如果partition规则设计的合理,所有的消息会均匀分配到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在创建topic时,可以在$KAFKA_HOME/config/server.properties中指定这个partition的数量(如下所示),当然也可以在topic创建之后去修改parition数量。

# The default number of log partitions per topic. More partitions allow greater  
# parallelism for consumption, but this will also result in more files across  
# the brokers.  
num.partitions=3

在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断这个将这条消息发送到哪个partition。paritition机制可以通过指定producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。(比如如果一个key能够被解析为整数,那么将对应的整数与partition总数取余,可以作为该消息被发送到的partition id)

6. 历史数据删除机制

对于传统的message queue而言,一般会删除已经消费过的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,由于磁盘限制,不可能永久保留,因此Kafka提供两种机制去删除旧数据。一是基于时间,一是基于partition文件大小。(例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据)

这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个consumer group保留一些metadata信息—当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。

7. Consumer Group
对于程序员来说,consumer group 是消费 Kafka 消息队列中消息的接口,每个 consumer group 可以消费多个 topic,对于每一个 topic 可以有多个消费者实体 consumer(对应多个程序或者进程)。在消费过程中,同一个 topic 中的消息只会被同一个 consumer group 中的一个 consumer 消费,而不会出现重复订阅的情况。对于每一个 consumer group 消费 topic,可以手动commit,也可以设置参数集群自动commit(确认)消费进行的位置,保证下一次能接着从上次的位置继续消费。
consumer group 和 topic 一样,也是直接使用就能新建的。如果直接新建一个 consumer,而不指定具体的 consumer group,系统会自动的指定默认的 consumer group,并且从最老的数据(EARLIEST)位置开始消费。


下面我们来看看pykafka具体操作kafka的过程

首先安装模块

pip install pykafka

然后具体常用操作如下

#!/usr/bin/python3
#coding:utf-8
from pykafka import KafkaClient
#多个broker用逗号隔开
client = KafkaClient(hosts="122.51.230.106:9092")
topic = client.topics['nginx-logs']
#生产者
producer = topic.get_producer()
for i in range(10):
    producer.produce("test "+str(i))
    i += 1
producer.stop()

#消费者
consumer = topic.get_simple_consumer('nginx-groups', auto_commit_enable=True)
for msg in consumer:
    print(msg.offset, msg.value)

#读取所有topic
for topic in client.topics:
    print(topic)

#查看broker信息
print(client.brokers)
for n in client.brokers:
    host = client.brokers[n].host
    port = client.brokers[n].port
    id = client.brokers[n].id
    print("host=%s | port=%s | broker.id=%s " %(host,port,id))


内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://sulao.cn/post/756.html