kafka

http://kafka.apache.org/

阿里云兼容 kafka 消息队列:https://www.aliyun.com/product/ons

Kafka 是一种高吞吐量的分布式发布订阅消息系统,其具备分布式功能、并可以结合 zookeeper 可以实现动态扩容,用于构建实时数据管道和流应用程序
它具有水平可伸缩性、容错性、快速性

常用消息队列对比:

kafka 优势

kafka为什么性能高?

  1. 顺序写入,kafka数据写入磁盘,不是保存在内存,默认保存168小时
  • kafka通过O(1)的磁盘数据结构提供消息的持久化,即使数以TB的消息存储也能够保持长时间的稳定性能
    • O(1)是最低的时间复杂度,哈希算法就是典型的 O(1) 时间复杂度,无论数据规模多大,都可以在一次计算后找到目标
  • 高吞吐量,即使是非常普通的硬件 Kafka 也可以支持每秒数百万的消息
  • 支持通过 Kafka 服务器分区消息,可以将数据保存到不同的服务器
  • 支持 Hadoop 并行数据加载

kafka 角色

  • broker:中文直译“中间人”,实际就是消息代理,是生产者和消费者中间代理保存消息的中转站,集群中每个 kafka 的 broker 都有唯一的 id,由 server.properties 中的 broker.id 指定,可以把每个kafka节点抽象的看成是一个broker,也可以把整个kafka集群抽象的看成是一个broker

  • topic:话题,生产者和消费者监听同一个topic,生产者往里写消息,消费者从里面读消息

  • partition:分区,也叫分片,物理上的概念,每个分区对应一个文件夹,topic 可以将其消息分片储存,提高性能,然后每个分片做多个副本,保证高可用

    注意:分片数量不要超过kafka节点数量;副本数量也不要超过kafka节点数量;

    • leader:分片副本的角色,主
    • follower:分片副本的角色,从

    对于一个分片,其副本只有一个是leader,其他的都是follower,leader不能和follower在同一个节点,这样就失去了高可用的意义

    高可用:当一个节点故障,其他的follower会选举出一个作为leader

    1
    2
    3
    4
    上图中 topic1 分了两片:topic1-part1、topic1-part2;
    上图中 topic2 只有一片:topic2-part1

    上图中 topic1 和 topic2 的分片都做了三个副本:topicX-part1、topicX-part2、topicX-part3
  • Producer:生产者,负责发布消息到 Kafka broker

  • Consumer:消费者,每个 consumer 属于一个特定的 consuer group(若不指定 group name 则属于默认 group),使用 consumer high level API 时,同一 topic 的一条消息只能被同一个 consumer group内的一个 consumer 消费,但多个 consumer group 可同时消费这一消息

kafka 和 zookeeper 的关系:

kafka 自身无法实现集群和高可用,kafka 依赖 zookeeper 实现集群和高可用

zookeeper 和 kafka 都可以存储数据,zookeeper储存单个数据在1MB以内,只用来保存服务的元数据,不保存业务信息

  1. Broker 依赖于 Zookeeper,每个 Broker 的 id 和 Topic、Partition 这些元数据信息都会写入 Zookeeper 的 ZNode 节点中
  2. Consumer 依赖于 Zookeeper,Consumer 在消费消息时,每消费完一条消息,会将产生的 offset 保存到 Zookeeper 中,下次消费在当前 offset 往后继续消费。注意:kafka0.9 之前 Consumer 的 offset 存储在 Zookeeper 中,kafka0,9 以后 offset存储在本地
  3. Partition 依赖于 Zookeeper,Partition 完成 Replication 备份后,选举出一个Leader,这个是依托于 Zookeeper 的选举机制实现的

kafka 部署

1
2
3
kakfa1.ljk.cn:10.0.1.101
kakfa2.ljk.cn:10.0.1.102
kakfa3.ljk.cn:10.0.1.103

快速部署:http://kafka.apache.org/quickstart

  1. 安装 zookeeper,这里就不配置集群了,安装单机zookeeper

  2. 安装 kafka

    1
    2
    3
    4
    5
    6
    # kafka 下载页面:http://kafka.apache.org/downloads
    [root@kakfa1 src]$tar -xzf kafka_2.13-2.7.0.tgz
    [root@kakfa1 src]$mv kafka_2.13-2.7.0 /usr/local/kafka
    [root@kakfa1 src]$cd /usr/local/
    [root@kakfa1 local]$scp -r ./kafka/ 10.0.1.102:/usr/local
    [root@kakfa1 local]$scp -r ./kafka/ 10.0.1.103:/usr/local
  3. 配置 kafka

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    [root@kakfa1 ~]$vim /etc/hosts
    ...
    10.0.1.101 kafka1.ljk.cn
    10.0.1.102 kafka2.ljk.cn
    10.0.1.103 kafka3.ljk.cn
    10.0.1.101 zk1.ljk.cn # zookeeper地址域名解析

    [root@kakfa1 local]$vim kafka/config/server.properties
    21 broker.id=1 # 每个 broker 在集群中的唯一标识,正整数
    31 listeners=PLAINTEXT://kafka1.ljk.cn:9092
    60 log.dirs=/usr/local/kafka/kafka-logs # kakfa用于保存数据的目录,所有的消都会存储在该目录当中
    65 num.partitions=1 # 设置创建新topic的默认分区数量
    103 log.retention.hours=168 # 设置kafka中消息保留时间,默认为168小时,即7天
    # 指定连接的zookeeper的地址,zk中存储了broker的元数据信息,如果zk是集群,多个zk地址使用逗号分割,这里为了方便,使用单机zookeeper,推荐使用域名,如果使用ip可能无法启动,不知道为什么
    123 zookeeper.connect=zk1.ljk.cn:2181
    126 zookeeper.connection.timeout.ms=6000 # 设置连接zookeeper的超时时间,默认6s

    [root@kakfa2 local]$vim kafka/config/server.properties
    21 broker.id=2
    31 listeners=PLAINTEXT://10.0.1.102:9092
    60 log.dirs=/usr/local/kafka/kafka-logs
    103 log.retention.hours=168
    123 zookeeper.connect=zk1.ljk.cn:2181
    126 zookeeper.connection.timeout.ms=6000

    [root@kakfa3 local]$vim kafka/config/server.properties
    21 broker.id=3
    31 listeners=PLAINTEXT://10.0.1.103:9092
    60 log.dirs=/usr/local/kafka/kafka-logs
    103 log.retention.hours=168
    123 zookeeper.connect=zk1.ljk.cn:2181
    126 zookeeper.connection.timeout.ms=6000
  4. 启动 kafka

    1
    2
    3
    4
    5
    6
    7
    [root@kakfa1 bin]$pwd
    /usr/local/kafka/bin
    [root@kakfa1 bin]$./kafka-server-start.sh -daemon ../config/server.properties

    [root@kakfa2 bin]$./kafka-server-start.sh -daemon ../config/server.properties

    [root@kakfa3 bin]$./kafka-server-start.sh -daemon ../config/server.properties

测试 kafka 读写数据

http://kafka.apache.org/quickstart

创建 topic

1
2
3
4
5
6
[root@kakfa1 bin]$./kafka-topics.sh --create \
--zookeeper zk1.ljk.cn:2181 \
--partitions 3 \
--replication-factor 3 \
--topic lujinkai
Created topic lujinkai.
  • –create:创建topic
  • –zookeeper:指定zk地址,虽然配置文件中已经指定了,但是命令行还要指定
  • –partitions:指定一个topic包含几个partition,就是对topic分片,分片可以提高性能,但是一般不用分片,保持默认值1就可以,如果分片,也不要超过节点的数量
  • –replication-factor:指定partition的副本数量,kafka实现高可用全靠partition的副本,如果设置3,则一个partition就存储3份,注意不是4份
  • –topic:指定名称

假设集群有4个broker,一个topic有4个partition,每个partition有3个副本。下图是每个broker上的副本分配情况:

验证 topic

1
2
3
4
5
6
7
[root@kakfa1 bin]$./kafka-topics.sh --describe \
--zookeeper zk1.ljk.cn:2181 \
--topic lujinkai
Topic: lujinkai PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: lujinkai Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: lujinkai Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: lujinkai Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3

说明:lujinkai 这个 topic 有三个分区分别为 0、1、2,分区 0 的 leader 是 3(broker.id),分区 0 有三个副本,并且状态都为 lsr(ln-sync,表示可以参加选举成为 leader)

获取所有 topic

1
2
[root@kakfa1 bin]$./kafka-topics.sh --list --zookeeper zk1.ljk.cn:2181
lujinkai

测试发送消息

1
2
3
4
5
[root@kakfa1 bin]$./kafka-console-producer.sh --topic lujinkai \
--broker-list kafka1.ljk.cn:9092,kafka2.ljk.cn:9092,kafka3.ljk.cn:9092
>hello
>word
>

测试获取消息

1
2
3
4
5
[root@kafka2 bin]$./kafka-console-consumer.sh --topic lujinkai \
--bootstrap-server kafka1.ljk.cn:9092,kafka2.ljk.cn:9092,kafka3.ljk.cn:9092 \
--from-beginning
hello
word
  • –bootstrap-server:kafak集群的地址,实际只写一个地址也行
  • –from-beginning:从最开始的数据进行消费

删除 topic

1
2
3
4
5
[root@kakfa1 bin]$./kafka-topics.sh --delete \
--topic lujinkai \
--zookeeper zk1.ljk.cn:2181
Topic lujinkai is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.