📖
R‘Notes
  • 关于本仓库/网站
  • Note
    • Golang的知识碎片
      • 关于Golang的一些碎片知识
    • LeetCode
      • LCP 47. 入场安检
      • LCR 121. 寻找目标值 - 二维数组
      • LCR 125. 图书整理 II
      • LCR 127. 跳跃训练
      • LCR 143. 子结构判断
      • LCR 159. 库存管理 III
      • LCR 161. 连续天数的最高销售额
      • LCR 170. 交易逆序对的总数
      • LCR 174. 寻找二叉搜索树中的目标节点
      • LeetCode--1. 两数之和
      • LeetCode--10. 正则表达式匹配
      • LeetCode--1004. 最大连续1的个数 III
      • LeetCode--101. 对称二叉树
      • LeetCode--1016. 子串能表示从 1 到 N 数字的二进制串
      • LeetCode--102. 二叉树的层序遍历
      • LeetCode--1027. 最长等差数列
      • LeetCode--103. 二叉树的锯齿形层序遍历
      • LeetCode--1035. 不相交的线
      • LeetCode--104. 二叉树的最大深度
      • LeetCode--1044. 最长重复子串
      • LeetCode--1049. 最后一块石头的重量 II
      • LeetCode--105. 从前序与中序遍历序列构造二叉树
      • LeetCode--1052. 爱生气的书店老板
      • LeetCode--106. 从中序与后序遍历序列构造二叉树
      • LeetCode--1092. 最短公共超序列
      • LeetCode--11. 盛最多水的容器
      • LeetCode--110. 平衡二叉树
      • LeetCode--1100. 长度为 K 的无重复字符子串
      • LeetCode--111. 二叉树的最小深度
      • LeetCode--112. 路径总和
      • LeetCode--113. 路径总和 II
      • LeetCode--1137. 第 N 个泰波那契数
      • LeetCode--114. 二叉树展开为链表
      • LeetCode--1143. 最长公共子序列
      • LeetCode--115. 不同的子序列
      • LeetCode--1151. 最少交换次数来组合所有的 1
      • LeetCode--1155. 掷骰子等于目标和的方法数
      • LeetCode--1176. 健身计划评估
      • LeetCode--1191. K 次串联后最大子数组之和
      • LeetCode--120. 三角形最小路径和
      • LeetCode--1208. 尽可能使字符串相等
      • LeetCode--121. 买卖股票的最佳时机
      • LeetCode--1218. 最长定差子序列
      • LeetCode--122. 买卖股票的最佳时机 II
      • LeetCode--1220. 统计元音字母序列的数目
      • LeetCode--123. 买卖股票的最佳时机 III
      • LeetCode--124. 二叉树中的最大路径和
      • LeetCode--125. 验证回文串
      • LeetCode--128. 最长连续序列
      • LeetCode--1289. 下降路径最小和 II
      • LeetCode--129. 求根节点到叶节点数字之和
      • LeetCode--1297. 子串的最大出现次数
      • LeetCode--1301. 最大得分的路径数目
      • LeetCode--1312. 让字符串成为回文串的最少插入次数
      • LeetCode--132. 分割回文串 II
      • LeetCode--134. 加油站
      • LeetCode--1343. 大小为 K 且平均值大于等于阈值的子数组数目
      • LeetCode--135. 分发糖果
      • LeetCode--136. 只出现一次的数字
      • LeetCode--138. 随机链表的复制
      • LeetCode--139. 单词拆分
      • LeetCode--14. 最长公共前缀
      • LeetCode--141. 环形链表
      • LeetCode--142. 环形链表 II
      • LeetCode--1423. 可获得的最大点数
      • LeetCode--143. 重排链表
      • LeetCode--144. 二叉树的前序遍历
      • LeetCode--1449. 数位成本和为目标值的最大数字
      • LeetCode--145. 二叉树的后序遍历
      • LeetCode--1456. 定长子串中元音的最大数目
      • LeetCode--1458. 两个子序列的最大点积
      • LeetCode--146. LRU 缓存
      • LeetCode--1461. 检查一个字符串是否包含所有长度为 K 的二进制子串
      • LeetCode--148. 排序链表
      • LeetCode--1493. 删掉一个元素以后全为 1 的最长子数组
      • LeetCode--15. 三数之和
      • LeetCode--151. 反转字符串中的单词
      • LeetCode--152. 最大乘积子数组【DP】
      • LeetCode--153. 寻找旋转排序数组中的最小值
      • LeetCode--155. 最小栈
      • LeetCode--1584. 连接所有点的最小费用,最小生成树模板题
      • LeetCode--1594. 矩阵的最大非负积
      • LeetCode--16. 最接近的三数之和
      • LeetCode--160. 相交链表
      • LeetCode--162. 寻找峰值
      • LeetCode--1626. 无矛盾的最佳球队
      • LeetCode--1639. 通过给定词典构造目标字符串的方案数
      • LeetCode--165. 比较版本号
      • LeetCode--1652. 拆炸弹
      • LeetCode--1671. 得到山形数组的最少删除次数
      • LeetCode--169. 多数元素
      • LeetCode--1691. 堆叠长方体的最大高度
      • LeetCode--174. 地下城游戏
      • LeetCode--1749. 任意子数组和的绝对值的最大值
      • LeetCode--1774. 最接近目标价格的甜点成本
      • LeetCode--179. 最大数
      • LeetCode--1824. 最少侧跳次数
      • LeetCode--1852. 每个子数组的数字种类数
      • LeetCode--188. 买卖股票的最佳时机 IV
      • LeetCode--1888. 使二进制字符串字符交替的最少反转次数
      • LeetCode--189. 轮转数组
      • LeetCode--19. 删除链表的倒数第 N 个结点,关于删除链表会遇见的指针问题
      • LeetCode--1937. 扣分后的最大得分
      • LeetCode--1964. 找出到每个位置为止最长的有效障碍赛跑路线
      • LeetCode--198. 打家劫舍
      • LeetCode--1981. 最小化目标值与所选元素的差
      • LeetCode--1984. 学生分数的最小差值
      • LeetCode--199. 二叉树的右视图
      • LeetCode--2. 两数相加
      • LeetCode--20. 有效的括号
      • LeetCode--200. 岛屿数量
      • LeetCode--206. 反转链表
      • LeetCode--2067. 等计数子串的数量
      • LeetCode--207. 课程表
      • LeetCode--208. 实现 Trie (前缀树)
      • LeetCode--209. 长度最小的子数组
      • LeetCode--2090. 半径为 k 的子数组平均值
      • LeetCode--21. 合并两个有序链表,关于链表的复习
      • LeetCode--210. 课程表 II
      • LeetCode--2107. 分享 K 个糖果后独特口味的数量
      • LeetCode--213. 打家劫舍 II
      • LeetCode--2134. 最少交换次数来组合所有的 1 II
      • LeetCode--2140. 解决智力问题
      • LeetCode--215. 数组中的第K个最大元素
      • LeetCode--2156. 查找给定哈希值的子串
      • LeetCode--22. 括号生成
      • LeetCode--221. 最大正方形
      • LeetCode--2218. 从栈中取出 K 个硬币的最大面值和
      • LeetCode--224. 基本计算器
      • LeetCode--225. 用队列实现栈
      • LeetCode--226. 翻转二叉树
      • LeetCode--2266. 统计打字方案数
      • LeetCode--2267. 检查是否有合法括号字符串路径
      • LeetCode--2269. 找到一个数字的 K 美丽值
      • LeetCode--227. 基本计算器 II
      • LeetCode--2291. 最大股票收益
      • LeetCode--23. 合并 K 个升序链表【堆和分治】
      • LeetCode--230. 二叉搜索树中第 K 小的元素
      • LeetCode--2304. 网格中的最小路径代价
      • LeetCode--232. 用栈实现队列
      • LeetCode--2320. 统计放置房子的方式数
      • LeetCode--2321. 拼接数组的最大分数
      • LeetCode--2328. 网格图中递增路径的数目
      • LeetCode--233. 数字 1 的个数
      • LeetCode--234. 回文链表
      • LeetCode--236. 二叉树的最近公共祖先
      • LeetCode--2369. 检查数组是否存在有效划分
      • LeetCode--2379. 得到 K 个黑块的最少涂色次数
      • LeetCode--239. 滑动窗口最大值,关于单调队列的复习
      • LeetCode--24. 两两交换链表中的节点
      • LeetCode--240. 搜索二维矩阵 II
      • LeetCode--2431. 最大限度地提高购买水果的口味
      • LeetCode--2435. 矩阵中和能被 K 整除的路径
      • LeetCode--2466. 统计构造好字符串的方案数
      • LeetCode--25. K 个一组翻转链表
      • LeetCode--2510. 检查是否有路径经过相同数量的 0 和 1
      • LeetCode--2533. 好二进制字符串的数量
      • LeetCode--256. 粉刷房子
      • LeetCode--2585. 获得分数的方法数
      • LeetCode--26. 删除有序数组中的重复项
      • LeetCode--2606. 找到最大开销的子字符串
      • LeetCode--265. 粉刷房子 II
      • LeetCode--2684. 矩阵中移动的最大次数
      • LeetCode--2787. 将一个数字表示成幂的和的方案数
      • LeetCode--279. 完全平方数【动态规划】
      • LeetCode--283. 移动零
      • LeetCode--2841. 几乎唯一子数组的最大和
      • LeetCode--287. 寻找重复数
      • LeetCode--2915. 和为目标值的最长子序列的长度
      • LeetCode--295. 数据流的中位数
      • LeetCode--2953. 统计完全子字符串
      • LeetCode--297. 二叉树的序列化与反序列化
      • LeetCode--3. 无重复字符的最长子串
      • LeetCode--30. 串联所有单词的子串
      • LeetCode--300. 最长递增子序列【DP+二分】
      • LeetCode--3082. 求出所有子序列的能量和
      • LeetCode--309. 买卖股票的最佳时机含冷冻期
      • LeetCode--3090. 每个字符最多出现两次的最长子字符串
      • LeetCode--31. 下一个排列
      • LeetCode--3180. 执行操作可获得的最大总奖励 I
      • LeetCode--3183. 达到总和的方法数量
      • LeetCode--3186. 施咒的最大总伤害
      • LeetCode--32. 最长有效括号【栈和dp】
      • LeetCode--322. 零钱兑换
      • LeetCode--328. 奇偶链表
      • LeetCode--329. 矩阵中的最长递增路径
      • LeetCode--3290. 最高乘法得分
      • LeetCode--33. 搜索旋转排序数组【直接二分】
      • LeetCode--3316. 从原字符串里进行删除操作的最多次数
      • LeetCode--3363. 最多可收集的水果数目
      • LeetCode--337. 打家劫舍 III
      • LeetCode--3393. 统计异或值为给定值的路径数目
      • LeetCode--34. 在排序数组中查找元素的第一个和最后一个位置
      • LeetCode--3418. 机器人可以获得的最大金币数
      • LeetCode--343. 整数拆分
      • LeetCode--3439. 重新安排会议得到最多空余时间 I
      • LeetCode--347. 前 K 个高频元素
      • LeetCode--347. 前 K 个高频元素Golang中的堆(containerheap)
      • LeetCode--3489. 零数组变换 IV
      • LeetCode--354. 俄罗斯套娃信封问题
      • LeetCode--3561. 移除相邻字符
      • LeetCode--3566. 等积子集的划分方案
      • LeetCode--3567. 子矩阵的最小绝对差
      • LeetCode--377. 组合总和 Ⅳ
      • LeetCode--39. 组合总和
      • LeetCode--394. 字符串解码【栈】
      • LeetCode--395. 至少有 K 个重复字符的最长子串
      • LeetCode--4. 寻找两个正序数组的中位数
      • LeetCode--40. 组合总和 II
      • LeetCode--402. 移掉 K 位数字
      • LeetCode--41. 缺失的第一个正数
      • LeetCode--415. 字符串相加
      • LeetCode--416. 分割等和子集_494. 目标和【01背包】
      • LeetCode--42. 接雨水(单调栈和双指针)
      • LeetCode--426. 将二叉搜索树转化为排序的双向链表
      • LeetCode--43. 字符串相乘
      • LeetCode--437. 路径总和 III【前缀和】
      • LeetCode--438. 找到字符串中所有字母异位词
      • LeetCode--44. 通配符匹配
      • LeetCode--440. 字典序的第K小数字
      • LeetCode--442. 数组中重复的数据
      • LeetCode--445. 两数相加 II
      • LeetCode--45. 跳跃游戏 II
      • LeetCode--450. 删除二叉搜索树中的节点
      • LeetCode--46. 全排列
      • LeetCode--460. LFU 缓存
      • LeetCode--468. 验证IP地址
      • LeetCode--47. 全排列 II
      • LeetCode--470. 用 Rand7() 实现 Rand10()
      • LeetCode--474. 一和零
      • LeetCode--48. 旋转图像
      • LeetCode--498. 对角线遍历
      • LeetCode--5. 最长回文子串
      • LeetCode--50. Pow(x, n)
      • LeetCode--509. 斐波那契数
      • LeetCode--516. 最长回文子序列
      • LeetCode--518. 零钱兑换 II
      • LeetCode--529. 扫雷游戏题解C++广搜
      • LeetCode--53. 最大子数组和
      • LeetCode--54. 螺旋矩阵
      • LeetCode--540. 有序数组中的单一元素
      • LeetCode--543. 二叉树的直径
      • LeetCode--55. 跳跃游戏
      • LeetCode--556. 下一个更大元素 III
      • LeetCode--56. 合并区间
      • LeetCode--560. 和为 K 的子数组
      • LeetCode--567. 字符串的排列
      • LeetCode--572. 另一棵树的子树
      • 583. 两个字符串的删除操作
      • LeetCode--59. 螺旋矩阵 II
      • LeetCode--61. 旋转链表
      • LeetCode--62. 不同路径
      • LeetCode--622. 设计循环队列
      • LeetCode--63. 不同路径 II
      • LeetCode--64. 最小路径和
      • LeetCode--643. 子数组最大平均数
      • LeetCode--646. 最长数对链
      • LeetCode--662. 二叉树最大宽度
      • LeetCode--673. 最长递增子序列的个数
      • LeetCode--678. 有效的括号字符串
      • LeetCode--679. 24 点游戏
      • LeetCode--683. K 个关闭的灯泡
      • LeetCode--69. x 的平方根
      • LeetCode--695. 岛屿的最大面积
      • LeetCode--7. 整数反转
      • LeetCode--70. 爬楼梯
      • LeetCode--704. 二分查找
      • LeetCode--712. 两个字符串的最小ASCII删除和
      • LeetCode--714. 买卖股票的最佳时机含手续费
      • LeetCode--718. 最长重复子数组
      • LeetCode--72. 编辑距离
      • LeetCode--739. 每日温度
      • LeetCode--74. 搜索二维矩阵
      • LeetCode--740. 删除并获得点数
      • LeetCode--746. 使用最小花费爬楼梯
      • LeetCode--75. 颜色分类
      • LeetCode--76. 最小覆盖子串
      • LeetCode--77. 组合
      • LeetCode--78. 子集
      • LeetCode--79. 单词搜索
      • LeetCode--790. 多米诺和托米诺平铺
      • LeetCode--8. 字符串转换整数 (atoi)
      • LeetCode--82. 删除排序链表中的重复元素 II
      • LeetCode--83. 删除排序链表中的重复元素
      • LeetCode--84. 柱状图中最大的矩形【单调栈】
      • LeetCode--85. 最大矩形
      • LeetCode--87. 扰乱字符串
      • LeetCode--879. 盈利计划
      • LeetCode--88. 合并两个有序数组
      • LeetCode--887. 鸡蛋掉落
      • LeetCode--91. 解码方法
      • LeetCode--912. 排序数组
      • LeetCode--918. 环形子数组的最大和
      • LeetCode--92. 反转链表 II
      • LeetCode--93. 复原 IP 地址
      • LeetCode--931. 下降路径最小和
      • LeetCode--94. 二叉树的中序遍历
      • LeetCode--958. 二叉树的完全性检验
      • LeetCode--960. 删列造序 III
      • LeetCode--97. 交错字符串
      • LeetCode--98. 验证二叉搜索树
      • LeetCode--983. 最低票价
      • LeetCode--LCR 140. 训练计划 II
      • NC--311.圆环回原点
      • NC--36进制加法
      • 补充题1. 排序奇升偶降链表
    • Redis
      • Redis基础部分
      • 在用Docker配置Redis哨兵节点的时候出现的错误及其解决
    • SQL学习记录
      • SQL碎片知识
      • 系统
        • MySQL学习笔记1【DQL和DCL】
        • MySQL学习笔记2【函数/约束/多表查询】
        • MySQL学习笔记3【事务】
        • MySQL学习笔记4【存储引擎和索引】
        • MySQL学习笔记5【SQL优化/视图/存储过程/触发器】
        • MySQL学习笔记6【锁】
        • MySQL学习笔记7【InnoDB】
    • x86汇编
      • 学习汇编随手记
    • 微服务相关
      • Nacos与gRPC
      • 【Golangnacos】nacos配置的增删查改,以及服务注册的golang实例及分析
    • 手搓
      • Whalebox(仿Docker)的爆诞
    • 操作系统
      • 操作系统碎片知识
      • MIT6.S081
        • MIT6.S081-lab1
        • MIT6.S081-lab2
        • MIT6.S081-lab3
        • MIT6.S081-lab3前置
        • MIT6.S081-lab4
        • MIT6.S081-lab4前置
        • MIT6.S081-lab5
        • MIT6.S081-lab5前置
        • MIT6.S081-lab7
        • MIT6.S081-lab7前置
        • MIT6.S081-lab8
        • MIT6.S081-lab8前置
        • MIT6.S081-lab9
        • MIT6.S081-环境搭建
    • 消息队列MQ
      • Kafka
    • 算法杂谈
      • 关于二分查找时的边界分类问题
    • 计组笔记
      • 计算机组成原理的学习笔记(1)--概述
      • 计算机组成原理的学习笔记(10)--CPU·其二 组合逻辑控制器和微程序
      • 计算机组成原理的学习笔记(11)--CPU·其三 中断和异常多处理器相关概念
      • 计算机组成原理的学习笔记(12)--总线和IO系统
      • 计算机组成原理的学习笔记(2)--数据的表示和运算·其一
      • 计算机组成原理的学习笔记(3)--数据表示与运算·其二 逻辑门和加减乘
      • 计算机组成原理的学习笔记(4)--数据的表示与运算·其三 补码的乘法以及原码补码的除法
      • 计算机组成原理的学习笔记(4)--数据的表示与运算·其三 补码的乘法以及原码补码的除法
      • 计算机组成原理的学习笔记(6)--存储器·其一 SRAMDRAMROM主存储器的初步认识
      • 计算机组成原理的学习笔记(7)--存储器·其二 容量扩展多模块存储系统外存Cache虚拟存储器
      • 计算机组成原理的学习笔记(8)--指令系统·其一 指令的组成以及数据寻址方式RISK和CISK
      • 计算机组成原理的学习笔记(9)--CPU·其一 CPU的基本概念流水线技术数据通路
    • 计算机网络
      • CS144
        • CS144 - Lab 0
        • CS144 - Lab 1
        • CS144 - Lab 2
        • CS144 - Lecture 1
        • CS144 - Lecture 2
        • CS144 - Lecture 3
由 GitBook 提供支持
在本页
  • 1. 基本知识
  • 1.1 前置知识
  • 1.2 生产者
  • 1.3 消费者
  • 1.4 调优
  1. Note
  2. 消息队列MQ

Kafka

上一页消息队列MQ下一页算法杂谈

最后更新于1个月前

1. 基本知识

1.1 前置知识

  1. topic表示一个类型/业务的数据的组

  2. 为方便扩展,提高吞吐率,一个topic分为多个partition。

  3. 配合分区的设计,提出消费者组的概念,每个消费者并行消费,同时,一个分区的数据,只能由一个消费者组的一个消费者来消费,除此之外,消费者和消费者相互独立,一个消费者消费之后,另一个消费者也可以消费这部分数据,在同一个消费者组里面,每个成员会被分配给不同的分区进行消费,在分区或者消费者变化的时候,也会对成员进行动态分配这些分区。

  4. 为提高可用性,每个partition都会有若干个副本,分为leader(当前正在执行的partition)和follower(备用的partition),当leader挂掉时,被选中的follower就会成为新的leader,跟redis的集群中的主从比较类似。

  5. kafka的部分数据存储在zookeeper中,记录了正在运行的节点以及每个分区的leader选举等信息,值得一提的是,在kafka2.8.0之后,kafka就可以不依赖于zookeeper,独立进行运行了。

如果通过客户端自动创建的话,partition默认只有一个,而我们可以在命令行输入kafka-topics.sh --topic create-test --bootstrap-server kafka-1:9092 --partitions 11 --create来创建一个有11个分区的topic,而如果是想要更新已有的topic的partition大小,应该将--create修改为--alter,如果是在docker环境中,也可以在进入容器之后输入kafka-topics.sh来查看命令的参数。其他的比如--describe用于查看topic的详细信息,--list查看所有主题。

而我们也可以在命令行进行消费者和生产者的操作,生产者输入kafka-console-producer.sh --bootstrap-server ip:port --topic [your topic],消费者则是将producer换成consumer即可,如下图所示:

除此之外,加上--from-beginning字段之后,consumer会加载所有的消息。

1.2 生产者

当生产者生产消息的时候,会经过producer->序列化器->拦截器(可选)->分区器->RecordAccumulator

分区器会将消息的数据进行分区,而对应的消息会被发到RecordAccumulator,此时还没有将数据发送,当数据积累到batch.size(默认16k)之后,Sender才会发送数据,当然,如果数据量比较少,滞留的时间超过linger.ms设定的时间,就会发送消息,但是默认linger.ms是0ms,也就是拿到消息就会立即发送数据,但实际可能因线程调度略有延迟。

当通过Sender发送数据时,会为每个Broker维护独立的请求队列。Kafka通过max.in.flight.requests.per.connection参数(默认5)控制每个Broker连接允许的最大未确认请求数。当某个Broker的in-flight请求数达到该限制时,针对该Broker的发送将暂停,直到收到对应的请求确认,之后才能继续发送新的请求。这个机制可以防止单个Broker堆积过多未确认请求,同时保证全局吞吐量,当然,如果等待的时间超过了request.timeout.ms(默认30s),生产者则会认为请求失败,随后进行重试,当然应答有三个级别,0代表无需等待数据落盘就可以应答,1代表leader收到数据就可以应答,-1代表leader和follower全都同步完毕之后,才可以应答,另外,在底层链路中,我们发送请求会通过调用selector将消息发送给kafka集群。

Go中的Kafka

相较于kafka-go还是感觉sarama好用一点,虽然不支持context。

func main() {
	brokers := []string{"localhost:29092", "localhost:29093", "localhost:29094"}
	topic := "cluster_test_topic"

	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Partitioner = sarama.NewRoundRobinPartitioner

	producer, err := sarama.NewAsyncProducer(brokers, config)
	if err != nil {
		log.Fatalf("Failed to start Kafka async producer: %v", err)
	}
	defer producer.Close()

	// 监听成功和失败的消息
	go func() {
		for msg := range producer.Successes() {
			fmt.Printf("Message sent successfully: topic:%s partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
		}
	}()

	go func() {
		for err := range producer.Errors() {
			fmt.Printf("Failed to send message: %v\n", err)
		}
	}()
	wg := sync.WaitGroup{}
	wg.Add(1000000)
	// 发送消息
	for i := 0; i < 1000; i++ {
		go func() {
			for j := 0; j < 1000; j++ {
				msg := &sarama.ProducerMessage{
					Topic: topic,
					Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka!! My id is %d", i)),
				}
				producer.Input() <- msg
				defer wg.Done()
			}
		}()
	}
	wg.Wait()
}

这里是一个简单的go的生产者客户端,可以向kafka发送异步的发送消息(取决于sarama.NewAsyncProducer这一方法,如果需要同步调用,则需要做一些修改),同时我们在启动生产者客户端的程序时,在终端上会显示我们的partition,而且可以明显的看见不同的消息,被分到了不同的partition上面,下面来细说一下分区的好处

分区(Partition)

Partition是Topic的子集,如果一个topic只设置在一个broker(机器)上面,则在传输巨大的数据量的时候,多台机器的负载不均匀,可能会导致broker压力过大,造成性能瓶颈,而且该Broker一旦故障,所有数据都会不可用,可靠性低。

所以引入了partition,一个topic可以具有多个partition,而每个partition可以存放在不同的broker上面,实现 数据分布式存储,这样,只要将消息均匀的发送到不同的partition上面,就能够实现broker的负载均衡,与此同时,默认情况下,如果消息带有key字段,那么kafka会根据这个key计算哈希值,将其放到合适的分区上面。

值得一提的是,和java客户端不同,go的Sarama客户端在不指定分区。并且不设定Key的时候,会采取轮询的策略来选择分区,而java客户端则是使用黏性分区来选择分区。

但是,我们在Sarama客户端,也可以自定义分区器,事实上,只需要自定义一个合乎规范的函数签名然后实现一个分区器的接口即可:

// 自定义分区器
type MyPartitioner struct {
	topic          string
}

var _ sarama.Partitioner = (*MyPartitioner)(nil)

// Partition implements sarama.Partitioner.
func (m *MyPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
	return 0, nil
}

// RequiresConsistency implements sarama.Partitioner.
func (m *MyPartitioner) RequiresConsistency() bool {
	return true
}
// 自定义构造函数
func NewMyPartitioner(topic string) sarama.Partitioner {
	return &MyPartitioner{
		topic: topic,
	}
}

func main() {
	...
	config.Producer.Partitioner = NewMyPartitioner
	...
}

另外,我们之前提到了拦截器,拦截器事实上就是在发送消息之前要处理的事情,我们可以在Sarama客户端中通过实现func (m *MyInterceptor) OnSend(*sarama.ProducerMessage)这个方法来实现自定义的拦截器!而使用这个拦截器,

具体操作如下:

type MyInterceptor struct{}

func (m *MyInterceptor) OnSend(*sarama.ProducerMessage) {
	fmt.Println("OnSend")
}

var _ sarama.ProducerInterceptor = (*MyInterceptor)(nil)

func main() {
	...
	config.Producer.Interceptors = []sarama.ProducerInterceptor{
		&MyInterceptor{},
	}
	...
}

至于序列化器,就是把我们的消息转换成能够在kafka中进行传输的字节流,具体的逻辑在这里:

		msg := &sarama.ProducerMessage{
			Topic: topic,
			Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka!! My id is %d", i)),
			Key:   sarama.StringEncoder(fmt.Sprintf("key-%d", i)),
		}

这里将我们的消息转换成kafka能够识别的数据,然后将其发送。

生产者如何提高吞吐量?

我们之前提过,kafka默认的linger.ms设置的是0,也就是收到数据立刻发送,但是,这样虽然能实时发送信息,但是这种模式就像一个大货车一次拉一小点东西,吞吐量肯定是不够的,所以我们可以对batch.size和linger.ms这两个参数进行调整来提高吞吐量,同时也可以进行压缩,来节省内存,从而提高吞吐量,而在go-Sarama客户端,对应producer的config可以这样调整:

	config.Producer.Flush.Bytes = 16 * 1024
	config.Producer.Flush.Frequency = time.Millisecond * 50
	config.Producer.CompressionLevel = int(sarama.CompressionSnappy)

除此之外,我们还可以通过config.ChannelBufferSize来调整生产者缓冲区的大小,缓冲区的设置会影响内存占用和吞吐量,所以需要权衡利弊。

数据

我们的数据应答类型在Sarama中是这样设置的(此处以-1为例):

config.Producer.RequiredAcks = sarama.WaitForAll

而重试类型默认为int最大值,我们可以通过下面的方式来设置:

config.Producer.Retry.Max = 10
  • 可靠性:

    之前提到过,我们的应答ACK有三种模式:0,不需要等数据落盘,直接应答,1,当leader的数据落盘之后,不需要等待follower同步即可应答,-1则是需要等待leader和follower都已经同步完毕,才进行应答。

    • 0:当消息发送出去,不等待kafka的相应,就认为信息已经完成,此时如果leader挂掉或者在数据落盘过程中挂掉了,那么相对应的数据也没有了,此时就一定会导致数据丢失,是最不可靠的。

    • 1:此时leader已经将消息写入到本地,在此之前,如果leader挂掉了,发送方也会认为超时,然后重新发送,所以此时比上一种更加可靠,但是如果在同步的时候,leader挂掉了,也会造成数据丢失,允许丢失个别数据,如传输普通日志。

    • -1:此时会等待leader和所有的follower同步,才会返回ack信息,但是,缺点是如果一个follower挂掉了,就会导致整个partition重试,适用于对可靠性要求高地场景。

      怎么解决这个问题呢?

      至少一次(At-Least-Once),事实上,Leader维护了一个动态的in-sync replica set表示和leader维持同步的follower集合,如果follower长时间不向leader申请通信或者同步请求,就会被leader踢出ISR,超时时间由replica.lag.time.max.ms设定,默认30s,这样就能一定程度地解决这个问题,可以类比为心跳机制。

      但是如果所有的follower都挂掉了,事实上,也就和1模式没有区别了,所以我们数据完全可靠的条件是:ack级别-1,分区副本大于等于2,ISR最小应答副本大于等于2。

      但与此同时还有一个问题就是数据重复问题,就是当所有节点都已经同步完成,但是恰好在应答的那一刻挂掉了,然后没有受到消息,生产者又发送一次数据,此时会发送到新的leader上,造成数据重复。

  • 数据重复问题

    精确一次(Exactly-Once)上面提到了,我们的-1模式可以保证数据不丢失,但是不保证不重复,而1模式可以保证数据不重复,而不能保证数据不丢失。而Kafka通过引入了幂等性和事务这两个特性。

    • 幂等性:通过PID,Partition,SeqNumber来判断当前的消息是否重复,PID就是生产者的ID,而Partition代表分区号,SeqNumber单调递增,所以幂等性只能保证单分区单会话内不重复,如果遇见一个这些部分重复的,就会自动忽视这些消息,幂等性默认是开启的,但是仅仅只能在一个会话中保证,如果传输过程中挂掉,又重新启动了,怎么办?

      config.Producer.Idempotent = true
    • 事务:开启事务,必须要开启幂等性,由于需要保持不同会话,能够保持状态,所以我们还需要一个事务id,在发送信息时,需要先标注好事务的id,以保证不同会话的同一个消息的一致性。为了保持事务的状态,Kafka中还存在一个特殊的Topic,这个Topic中默认50个分区,将所有的事务保存到磁盘中,通过计算事务id的哈希值,我们可以找到对应的事务,并且由对应的事务协调器负责这个事务(一一对应),这样即便客户端挂掉,重启之后也能继续处理未完成的事务,或者回滚事务,保证数据一致性。

      事务底层依赖于幂等性,即便如此,当producer重启后,即便PID不同,Kafka也能根据事务ID来识别消息是否相同。除此之外,Sarama客户端使用事务的流程如下:

      func main() {
      	defer func() {
      		if err := recover(); err != nil {
      			color.Red("Error: %v", err)
      		}
      	}()
      	brokers := []string{"localhost:29092", "localhost:29093", "localhost:29094"}
      	topic := "cluster_test_topic"
      
      	config := sarama.NewConfig()
      	config.Producer.Return.Successes = true
      	config.Producer.RequiredAcks = sarama.WaitForAll
      	config.Producer.Retry.Max = 10
      	config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
      	config.Producer.Idempotent = true
      	config.Net.MaxOpenRequests = 1
      	config.Producer.Transaction.ID = "my-transaction-id"
      	producer, err := sarama.NewAsyncProducer(brokers, config)
      	if err != nil {
      		log.Fatalln("Failed to start Sarama producer:", err)
      	}
      	defer producer.Close()
      
      	go func() {
      		for range producer.Successes() {
      			color.Green("Message delivered successfully")
      		}
      	}()
      	go func() {
      		for err := range producer.Errors() {
      			panic(err)
      		}
      	}()
      
      	producer.BeginTxn()
      	for i := 0; i < 100000; i++ {
      		msg := &sarama.ProducerMessage{
      			Topic: topic,
      			Value: sarama.StringEncoder("Hello kafka World!"),
      		}
      		producer.Input() <- msg
      	}
      	producer.CommitTxn()
      }
  • 数据有序:在单个分区里面,数据是有序的,但是如果消费多个分区的数据,则无法保证有序。

  • 数据乱序:之前提过,broker最多能够缓存五个请求,比如,当第三个请求失败,但是第四个请求成功了,此时就会造成乱序,有一种解决方案是将max.in,flight.request.per.connection设置为1,表示最多只能缓存一个请求,但是效率低下,但是如果启动幂等性的话,这个值就可以设置小于等于5,这就可以保证最近五个请求不乱序了,因为我们知道幂等性有一个参数是序号,所以能够解决乱序的问题。

Broker

zookeeper存储的kafka相关信息:

  1. 记录有哪些服务器。

  2. 记录每一个主题的leader以及ISR。

  3. 辅助leader选举的controller。

在每个kafka实例启动后,都会向zookeeper注册broker,随后开始选择controller,按照先来后到的原则,谁先进行注册,哪个broker就会被选举为controller。

**Controller是什么?**controller是一个特殊的broker,一个集群中只有一个controller,由zookeeper辅助选举,如果当前controller宕机,kafka通过zookeeper监控controller的状态,此时,zookeeper会重新辅助选举新的controller。

同时controller负责监听brokers的节点变化,负责每个分区partition的leader的选举,每次某个broker宕机或者加入时,都会进行重新选举,在选举一个新的leader之后,Controller就会将这些信息上传到zookeeper,此时,还会将这些信息同步给其他节点,以便于controller挂掉之后,其他节点可以随时进行选举新的controller。

他还负责维护分区副本的管理,确保同步机制正常运行,维护kafka元数据,包括主题分区副本等信息,也负责处理topic和partition的创建删除和修改,同时,由于Controller仅仅负责管理,所以他的变更并不会影响到集群的正常运行,但是频繁的变更会影响kafka的性能。

在我们的生产者向其中发送信息的时候,follower会同步leader的信息,底层采用的是log的方式存储这些信息,log的底层说segment(以1个G为单位),为了实现快速查找,里面还有index索引的概念,利用索引来进行检索。

节点的服役与退役,在我们的节点服役和退役的时候,partition并不会自动的进行调整,而是需要我们手动进行负载均衡,具体步骤如下

  1. 首先需要在容器内创建一个topics.json文件,输入以下内容,但是通常容器内置没有文本编辑器,这个时候通过echo命令写入就可以了。

    {
      "topics": [
        {"topic": "cluster_test_topic"} 
      ],
      "version": 1
    }
  2. 随后执行kafka-reassign-partitions.sh --bootstrap-server kafka-1:9092 --topics-to-move-json-file topics.json --broker-list "1,2,3" --generate,相关参数可能需要根据实际情况修改,然后终端就会输出,当然,这里的kafka仅仅是提供了一种分配方法,实际上是可以自定义的。

    {
      "version": 1,
      "partitions": [
        {"topic": "cluster_test_topic", "partition": 0, "replicas": [1], "log_dirs": ["any"]},
        {"topic": "cluster_test_topic", "partition": 1, "replicas": [2], "log_dirs": ["any"]},
        {"topic": "cluster_test_topic", "partition": 2, "replicas": [3], "log_dirs": ["any"]},
        {"topic": "cluster_test_topic", "partition": 3, "replicas": [1], "log_dirs": ["any"]},
        {"topic": "cluster_test_topic", "partition": 4, "replicas": [2], "log_dirs": ["any"]},
        {"topic": "cluster_test_topic", "partition": 5, "replicas": [3], "log_dirs": ["any"]},
        {"topic": "cluster_test_topic", "partition": 6, "replicas": [1], "log_dirs": ["any"]},
        {"topic": "cluster_test_topic", "partition": 7, "replicas": [2], "log_dirs": ["any"]},
        {"topic": "cluster_test_topic", "partition": 8, "replicas": [3], "log_dirs": ["any"]},
        {"topic": "cluster_test_topic", "partition": 9, "replicas": [1], "log_dirs": ["any"]},
        {"topic": "cluster_test_topic", "partition": 10, "replicas": [2], "log_dirs": ["any"]},
        {"topic": "cluster_test_topic", "partition": 11, "replicas": [3], "log_dirs": ["any"]}
      ]
    }

    形如这样的信息,将其echo到reassign.json中即可。

  3. 随后执行kafka-reassign-partitions.sh --bootstrap-server kafka-1:9092 --reassignment-json-file reassign.json --execute命令执行分配。

  4. 最后kafka-reassign-partitions.sh --bootstrap-server kafka-1:9092 --reassignment-json-file reassign.json --verify查看,如果所有分区都已经被正确分配,那么就算完成了!

    这里如果是新增节点的话,肯定需要在一开始就新增节点,但是如果是退役节点的话,一般是在分配完成之后进行退役。

除此之外,还需要提到副本的问题,副本是什么?副本就是一个分区partition的备份,一个partition会有leader和follower,当leader挂掉的时候,我们就会选举一个副本成为leader,这样就保证了高可用性,但是副本不宜过多,当我们副本过多,主从同步就需要更多的时间和磁盘资源来继续你同步,并且占用的空间大小也会增大,增加了系统资源的损耗和延迟,而当我们使用waitforall级别的可靠性,延迟就会更加明显。

我们第一次手动分配分区的时候,如果没有执行副本数量,那么就不会分配副本!

所以我们需要在创建之初就指定副本数量kafka-topics.sh --bootstrap-server kafka-1:9092 --create --topic cluster_test_topic --partitions 12 --replication-factor 3,当然,如果在创建的时候忘记了,也没关系,步骤和重新分配分区的流程是一样的,区别就是在第二步的时候,我们可以将kafka给定的json自定义;

{
    "version": 1,
    "partitions": [
        {"topic": "cluster_test_topic", "partition": 0, "replicas": [1, 2, 3], "log_dirs": ["any", "any", "any"]},
        {"topic": "cluster_test_topic", "partition": 1, "replicas": [2, 3, 1], "log_dirs": ["any", "any", "any"]},
        {"topic": "cluster_test_topic", "partition": 2, "replicas": [3, 1, 2], "log_dirs": ["any", "any", "any"]},
        {"topic": "cluster_test_topic", "partition": 3, "replicas": [1, 2, 3], "log_dirs": ["any", "any", "any"]},
        {"topic": "cluster_test_topic", "partition": 4, "replicas": [2, 3, 1], "log_dirs": ["any", "any", "any"]},
        {"topic": "cluster_test_topic", "partition": 5, "replicas": [3, 1, 2], "log_dirs": ["any", "any", "any"]},
        {"topic": "cluster_test_topic", "partition": 6, "replicas": [1, 2, 3], "log_dirs": ["any", "any", "any"]},
        {"topic": "cluster_test_topic", "partition": 7, "replicas": [2, 3, 1], "log_dirs": ["any", "any", "any"]},
        {"topic": "cluster_test_topic", "partition": 8, "replicas": [3, 1, 2], "log_dirs": ["any", "any", "any"]},
        {"topic": "cluster_test_topic", "partition": 9, "replicas": [1, 2, 3], "log_dirs": ["any", "any", "any"]},
        {"topic": "cluster_test_topic", "partition": 10, "replicas": [2, 3, 1], "log_dirs": ["any", "any", "any"]},
        {"topic": "cluster_test_topic", "partition": 11, "replicas": [3, 1, 2], "log_dirs": ["any", "any", "any"]}
    ]
}

将replica的参数进行修改,这样就可以正确的分配副本了!然后execute,就完成了。

既然提到了副本,我们还需要引入一下leader选举的一些知识,一般来说,我们手动通过命令行分配副本或者partition,会默认采取负载均衡的策略,但是如果是在一些节点宕掉的时候,然后进行选举,就会采取抢占式的选举leader,这就可能会导致某一个broker负载过大,broker集群的负载不均衡,而我们可以通过重新执行replica的再分配或者定期执行kafka-leader-election.sh --bootstrap-server kafka-1:9092 --election-type PREFERRED --all-topic-partitions这个命令来实现负载均衡,同时这个命令也可以用于重启后恢复原本的分区状态;另外,还有几个参数可以解决这个问题:auto.leader.rebalance.enable默认为true,自动平衡,leader.imbalance.per.broker.percentage表示每个broker允许的不平衡的leader的比率,超出就会触发平衡机制,leader.imbalance.check.interval.seconds检查leader是否负载平衡的间隔时间。

另外,我们的生产者只会将数据发送给Leader,然后follower会与leader发送同步请求,如果长时间follower没有向leader通信或者发送同步请求,就会被踢出ISR,这个时间阈值参数是replica.lag.time.max.ms,而OSR表示同步过程中延迟过多的副本,replicas表示所有存储副本的节点,而ISR表示所有保持同步的节点,也就是说,如果机器挂掉,ISR会将这个节点移除,但是replicas不会!

follower故障:在leader和follower同步的时候,LEO是每个副本的最后一个offset + 1,而HW则是所有副本中最小的LEO,也就是说,所有的Follower的LEO虽然不一定一样,但是HW是一样的,HW也就是(High WaterMark)高水位线,当其中一个follower挂掉之后,会被踢出ISR,之后,其他的follower和leader会继续同步,维护一个HW,当之后,follower恢复了,此时会舍去挂掉的时候记录的HW之后的数据,然后重新开始同步,直到达到HW,就可以再次加入ISR。

leader故障:leader挂掉之后,会重新选拔一个新的leader,同时,leader和follower的数据,超过HW的部分都会被舍去,保证数据一致性,但是无法保证数据不丢失或者不重复。

文件存储

  • Topic是逻辑上的概念,而Partition是物理上的概念,每个partition都对应一个log文件,其中存储的是生产者生产的数据,但是为了防止log文件过大,导致搜索效率低,每个partition的log又被分成了多个segment,单位为1个G,每个segment包括.index(索引),.log(存储数据),.timeindex文件(时间戳索引,辅助定期删除),值得注意的是,index并不是为每一条数据都设置了索引,而是使用了稀疏索引,默认每写入4kb数据,会往index文件写入一条索引,可以通过log.index.interval.bytes修改,同时index中保存的offset为相对的offset,这样既可以执行查找的功能,也可以节省内存,防止offset过大。

  • kafka中的日志默认保存时间是七天,七天一到,就可以通过delete或者compact策略进行日志清理,默认是基于时间的删除delete策略,以segment所有记录中最大时间戳作为该文件的时间戳,以此为基准执行删除。另一种是基于大小的删除策略,超过设置的所有日志的总大小,删除最早的segment,类似LRU机制;而压缩日志compact策略则是将所有的Key相同的数据,只保留最新的Key,这样来压缩,类似redis的AOF重写。

  • Kafka能做到高效的读写数据,原因如下:

    1. 本身为分布式集群,可以采取分区技术,并行度高。

    2. 读数据采取稀疏索引,可以快速定位要消费的数据。

    3. 顺序写磁盘,生产者生产数据以追加写的形式写入到log文件,相较于随机写,顺序写之所以快。是因为省去了大量磁头寻址的时间。

    4. Kafka采取了页缓存和零拷贝技术,页缓存就是生产者将数据发送时,先将数据写道内存页中,然后由操作系统内核决定何时刷新到磁盘,这样就不会在写入的时候触发磁盘I/O,同时,如果consumer读取数据,会先从页缓存中找,找不到再去磁盘中寻找,就减少了频繁的磁盘I/O,提高了读写效率;零拷贝,Kafka直接调用sendfile()让数据从页缓存直接发送到TCP socket,而不需要走用户态将数据交给应用层,再通过向下传输将数据发送给TCP socket,简单来说,零拷贝允许数据直接在内核空间传递,而减少了用户态和内核态数据的来回拷贝和切换,提高了读写效率。

1.3 消费者

Kafka的消费方式是什么?

一般来说,消息队列有两种消费方式,pull拉模式和push推模式,而kafka采取的是拉模式

拉取就是consumer主动从broker中拉取数据,这样,每个consumer可以根据自己的处理能力去拉去相应数据量大小的数据,保证了每个consumer的消费能力被充分利用

而推模式,为什么kafka不采取这种形式?因为推模式中,消息的发送速率由broker决定,很难适应所有消费者的消费速率,比如如果推送速度过低,由于消费者消费能力参差不齐,就会导致部分consumer的能力没法充分利用,但是如果推送速度稍微大一点,一些consumer由来不及处理消息。

但是即便如此,拉模式依旧有自己的缺点,当kafka没有数据的时候,消费者可能会陷入循环,一直返回空数据。

工作流程

每个独立的消费者都可以去消费数据,并且可以重复消费其他消费者消费的数据,但是,如果这两个消费者位列同一个消费者组中,则这个消费者组会被视为一个"消费者",通俗来将,就是消费者组只能对同一份数据消费一次,也就是说,同一份数据不能被消费者组中的消费者消费两次,并且,每个消费者都会被分配一个partition,让他们去特定的partition去消费数据,同一个消费者组中的两个消费者不能同时消费一个partition。

另外,为防止kafka节点或者消费者挂掉后,消费者不知道上一次消费某个partition消费哪里了,最新版的kafka中还维护了一个__consumer_offsets来保存消费者消费到的数据的偏移量,而老版本的kafka将这个信息维护在zookeeper中,而维护在kafka主题中,方便管理维护,也减少了通信的时间消耗。

形成一个消费者组的条件是:消费者的GroupID相同,当然如果partition的数量超过了消费者组中的消费者的数量,被空出来的消费者不会参与消费。

消费者是如何实现分区的分配的?首先有一个coordinator,用来辅助消费者的初始化和分区的分配。最开始会从__consumer_offsets中,通过消费者组的id进行哈希计算,选择一个partition来存储消费者的offset数据,而负责管理这个partition的broker,也就成为了这个消费者组的coordinator,而选举完成之后,所有的consumer都会向这个coordinator发送JoinGroup请求,而coordinator又会从这些consumer中选举一个作为消费者组的leader,此后,coordinator会把要消费的topic情况发送给leader消费者,随后leader会制定一个消费方案,这里就涉及到一个分区分配的策略,随后就将分配的方案发送给每个consumer,然后进行消费,而且每个消费者都会定期和coordinator保持心跳机制,一旦超时,就会被移除。并且会触发再平衡(重新分配消费任务)。(当某个消费者消费过慢,也会触发)

消费者组是怎么消费的?

首先我们需要创建一个用于访问kafka集群的消费者客户端,这个客户端当然也有config配置,和生产者客户端类似,有每批次的拉取大小,拉取数据的超时时间,每批次最大拉取大小的信息,当拉取消息时,在kafka端会返回数据并放入一个拉取队列(缓冲区)中,随后经过拦截器和反序列化器,最终将消息返回到客户端,当然,处理完消息后,还需要提交offset偏移量(分为手动和自动),告诉kafka集群当前消息已经被该消费者组消费。

下面是go-Sarama消费者客户端代码:

// ConsumerGroupHandler 实现了 ConsumerGroupHandler 接口
type ConsumerGroupHandler struct{}

// Cleanup implements sarama.ConsumerGroupHandler.
func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
	color.Green("消费者关闭!\n")
	return nil
}

// Setup implements sarama.ConsumerGroupHandler.
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
	color.Green("消费者启动!\n")
	return nil
}

// 消费消息并打印
func (h *ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		fmt.Printf("Consumed message: %s\n", string(message.Value))
		// 手动提交偏移量
		sess.MarkMessage(message, "")
       sess.Commit()
	}
	return nil
}

var _ sarama.ConsumerGroupHandler = (*ConsumerGroupHandler)(nil)

func main() {
	defer func() {
		if err := recover(); err != nil {
			color.Red("Error: %v", err)
		}
	}()
	brokers := []string{"localhost:29092", "localhost:29093", "localhost:29094"}
	topic := "cluster_test_topic"

	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
    //保持偏移量是最新的位置
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	//初始化消费者组
	ConsumerGroup, err := sarama.NewConsumerGroup(brokers, "my-group", config)
	if err != nil {
		panic(err)
	}
    //此处采取for循环是因为在kafka的rebalance之后,consume会返回错误导致无法继续消费
    //所以此处需要采取循环。
	go func() {
		for {
            //指定topic
			err = ConsumerGroup.Consume(context.Background(), []string{topic}, &ConsumerGroupHandler{})
			if err != nil {
				color.Red("Error from consumer: %v", err)
			}
		}
	}()
	defer ConsumerGroup.Close()
	select {}
}

而如果想要消费特定分区,则不能采取consumerGroup的形式,而是单独使用Consumer,然后调用ConsumePartition,同样的,生产者也可以配置字段单独向一个partition发送信息。

分区策略

kafka中自带的分区策略有Range,Roundrobin,Sticky,CooperativeSticky,而Kafka可以同时使用多个分区分配策略,在Sarama客户但可以通过调整config.Consumer.Group.Rebalance.Strategy来修改采取的分区策略,默认是Range+CooperativeSticky

  • Range:范围分配策略,针对于每个主题对每个分区和每个消费者进行编号排序,然后用消费者去对应每一个partition,总体来说就是(四个分区,三个消费者):

    Partition1 <-> Consumer1

    Partition2 <-> Consumer2

    Partition3 <-> Consumer3

    Partition4 <-> Consumer1

    虽然只针对一个topic而言,编号较低的Consumer可能消耗不大,但是如果对于上百个topic而言,低位的Consumer就多承担上百个partition,容易造成数据倾斜!

  • RoundRobin:轮询分配策略,roundrobin是针对于所有的消费者订阅的topic而言,将所有的partition和consumer排序,然后按照range的轮询方法将partition分配给消费者。、

    以上两种在Rebalance时,都会重新分配所有的分区。

  • Sticky:粘性分配策略,随机且均匀,初始分配时尽量负载均衡,但是在重分配时,会尽可能保留原有的分区分配,而仅仅调整部分的分区分配,这样可以减少分区迁移的开销,但是实现比较复杂。

offset

每个消费者组为了记录每个partition消费到了什么位置,都需要记录offset的位置,而这个offset在旧版本的kafka是存储在zookeeper里面的,在新版本的Kafka中,是存在__Conustmer_offsets的主题中的,里面有50个partition,而在这个topic中,是采取key-value的格式存储offset的值,key是groupid + topic +分区号,value对应的则是offset,同时,每隔一段时间,kafka内部还对这个topic进行compact压缩,这样能够保存最新的数据。

  • 自动保存:kafka提供了自动提交offset的功能,以便于我们能够专注于自己的业务逻辑,配置参数为:enable.auto.commit以及auto.commit.interval.ms表示是否开启自动提交功能,自动提交offset的时间间隔,默认开启和5s,在Sarama客户端中,对应的字段为:

    config.Consumer.Offsets.AutoCommit.Enable
    config.Consumer.Offsets.AutoCommit.Interval

    虽然很方便,但是缺点很明显,如果在还没有提交的时候,但是此时消费者挂了,就会导致重复消费!因此,我们的kafka也提供了手动提交的功能。

  • 手动提交:手动提交又分为同步和异步,通过手动提交,我们能够更好地控制offset的提交,通常我们是采取异步提交的方式来手动提交offset,但是Sarama库似乎并没有直接封装异步提交的API,需要我们去手动实现,而kafka-go这个包貌似是支持的。

  • 指定Offset:在Sarama中,可以通过设置config.Consumer.Offsets.Initial这个字段值,来设置我们此次消费的起始位置,默认是从最新的offset进行消费的。当然,此处只能指定分区和指定offset才能够使用,既然是指定offset,当然也可以通过执行时间戳来进行查找,在Sarama中,我们需要通过sarama.NewClient(brokers, config)创建一个client,然后调用client.GetOffset(topic, partition, targetTime)来获取当前时间戳的offset,随后执行执行消费操作,当然你也可以通过遍历topic的所有分区来实现在某一时间段之后的所有消息的消费。

  • 重复消费和漏消费:一般来说,自动提交offset会引起重复消费,而在自动提交的间隔期间,consumer挂掉了,重启就会出现重复消费的情况,同时,手动提交也可能引起重复消费,比如说,提交offset的时候,网络故障或者kafka宕机了,kafka就无法接受提交的offset,就会导致重复消费,而漏消费则是在消费消息之前提交了offset,如果在处理业务的时候崩溃,那么此时offset已经提交,就无法重新进行笑飞了,造成漏消费的情况,而不管自动提交还是手动提交都会有这种情况,所以一般来说是采取先处理完业务,再手动提交的方式。

    而一般来说,我们的解决方案就是采取事务的方式去处理这个问题,当然,这也要求下游的消费者,如MySQL,支持事务,否则是做不到事物的回滚的

  • 消息积压:消息积压就是说,消费的速度小于生产的速度,而我们在kafka中的数据滞留过久,就会被删除,所以我们需要考虑去提高消费者的消费能力:

    1. 消费能力不足,可以增加分区,同时增加消费者的数量

    2. 如果是数据处理不及时,可以提高每批次拉取消息的数量,批次拉取的数量过少,也会导致数据积压,同时我们在提高每批次拉取消息的数量的时候,也需要提高每批次拉取的数据大小。

1.4 调优

生产者调优

  • linger.ms可以调整每批次最长发送消息的间隔

  • batch.size可以调整每批次发送的消息的最大值

  • config.ChannelBufferSize缓冲区的总大小,较大的缓冲区可以提高吞吐率,但是会增加内存占用,如果缓冲区较小,可能会导致生产者阻塞,从而吞吐量降低。

  • 幂等性:开启幂等性可以使得在broker中缓存的五个请求不会乱序,或者说将broker缓存的最大数据量设置成1(效率低下)。

  • retry:重试的次数,默认是int的最大值,如果不希望一直重试,可以自己手动调小。

  • retry间隔时间:默认100ms。

  • 回应方式:之前提过有0,1,-1三者等待方式,0就是直接发送出去就不管了,1则是消息在leader上面落盘之后返回消息,-1是最可靠的,也就是waitforall,等待leader和follower全部同步完成才会返回ack,当然这里也有关于exactly once的笔记,上面有详细的介绍。

  • 压缩方式:默认为none,一般会配置成snappy这种比较轻量的压缩方式,用于提高吞吐量。

Broker调优

  • replica.lag.time.max.ms:表示ISR中Follower未向Leader同步或通信被踢出的时间。

  • auto.leader.rebalance.enable:Leader Partition的自动平衡,默认关闭,除非节点经常挂,否则不建议打开,相关的还有超过一定百分比触发自动平衡以及定期检查是否平衡的参数。

  • segment大小:默认1G.

  • log.index.interval.bytes:默认4kb,每写入4kb就会添加一个索引。

  • 数据保存时间:默认七天,相关的还有检查是否超时的时间间隔

  • 删除策略:默认delete,如果为compact,则会采取压缩策略。

...还有一堆读写拉传输的线程数,以及强制页缓存刷写到磁盘的条数以及刷写数据到磁盘的时间间隔。

另外一些,就是扩展分区数,调整分区副本的存储,手动负载均衡。还有自动创建主题,虽然在测试环境中可以随便开启,但是在生产环境一般是关闭的,防止出现未知的乱七八糟的topic,并且自动创建的主题,partition一般都是默认值,不如手动创建。

消费者调优

在一个消费者组中,首先会通过哈希计算,计算出自己的哈希值,然后对应__consumer_offsets这个特殊分区上面的特殊的partition,然后将管理这个partition的broker设置为这个消费者组的coordinator,辅助这个消费者组的初始化和分区的分配,然后每个消费者向这个coordinator注册自己的信息,表示要加入这个group,然后coordinator会选择一个消费者作为leader,然后将要消费的topic信息发给leader,由leader去执行分区分配,将分区分配的方案发给coordinator,随后coordinator下发给这个消费者组的所有consumer,在这个过程中,每个消费者都会保持心跳,超过这个时间,就会踢出消费者组,并触发再分配,或者消费时间过长,也会触发再分配。

  • Fetch.min.bytes:每批次的最小抓取数据大小,默认一字节。

  • fetch.max.wait.ms:一批数据的最长的等待时间,超出这个之间就会拉取数据。

  • fetch.max.bytes:每批次的最大抓取大小,

  • max.poll.recodes:每次拉取数据的最大跳数。

  • auto.commit:自动提交,如果追求exactly once,默认开启

  • session.time.out:消费者和coordinator的超时时间。

数据精准一次如何实现?

生产者讲ack生成-1,同时开启幂等性和事务,broker角度分区副本数量大于等于二,ISR应答的最小副本数量大于等于2,消费者角度,开启事务,并且手动提交offset,并且输出的目的地必须支持事务,如MySQL。

QQ_1742437593900