0%


  1. 单独下拉列表
  2. 级联列表
  3. 自动填充
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import org.apache.poi.hssf.usermodel.DVConstraint;
import org.apache.poi.hssf.usermodel.HSSFDataValidation;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.*;
import org.apache.poi.ss.util.CellRangeAddressList;

import java.io.IOException;
import java.util.*;

/**
* ExcelUtil
*
* @author maxzhao
* @date 2021-03-24 14:51
*/
public class BootExcelUtil {

/**
* 给sheet页,添加下拉列表
*
* @param workbook excel文件,用于添加Name
* @param targetSheet 级联列表所在sheet页
* @param options 级联数据 ['百度','阿里巴巴']
* @param column 下拉列表所在列 从'A'开始
* @param fromRow 下拉限制开始行
* @param endRow 下拉限制结束行
*/
public static void addValidationToSheet(Workbook workbook, Sheet targetSheet, Object[] options, char column, int fromRow, int endRow) {
String hiddenSheetName = "sheet" + workbook.getNumberOfSheets();
Sheet optionsSheet = workbook.createSheet(hiddenSheetName);
String nameName = column + "_parent";
int rowIndex = 0;
Cell cellTemp;
Row rowTemp;
int columnIndex = 0;
for (Object option : options) {
columnIndex = 0;
rowTemp = optionsSheet.createRow(rowIndex++);
cellTemp = rowTemp.createCell(columnIndex++);
cellTemp.setCellValue(option.toString());
}
createName(workbook, nameName, hiddenSheetName + "!$A$1:$A$" + options.length);
DVConstraint constraint = DVConstraint.createFormulaListConstraint(nameName);
CellRangeAddressList regions = new CellRangeAddressList(fromRow, endRow, (int) column - 'A', (int) column - 'A');
targetSheet.addValidationData(new HSSFDataValidation(regions, constraint));
}

/**
* 给sheet页 添加级联下拉列表
*
* @param workbook excel
* @param targetSheet sheet页
* @param options 要添加的下拉列表内容 , keys 是下拉列表1中的内容,每个Map.Entry.Value 是对应的级联下拉列表内容
* @param keyColumn 下拉列表1位置
* @param valueColumn 级联下拉列表位置
* @param fromRow 级联限制开始行
* @param endRow 级联限制结束行
*/
public static void addValidationToSheet(Workbook workbook, Sheet targetSheet, Map<String, List<String>> options, char keyColumn, char valueColumn, int fromRow, int endRow) {
String hiddenSheetName = "sheet" + workbook.getNumberOfSheets();
Sheet hiddenSheet = workbook.createSheet(hiddenSheetName);
List<String> firstLevelItems = new ArrayList<>();

int rowIndex = 0;
DVConstraint constraintTemp;
Cell cellTemp;
Row rowTemp;
String parentTemp;
for (Map.Entry<String, List<String>> entry : options.entrySet()) {
parentTemp = formatNameName(entry.getKey());
firstLevelItems.add(parentTemp);
List<String> children = entry.getValue();

int columnIndex = 0;
rowTemp = hiddenSheet.createRow(rowIndex++);

for (String child : children) {
cellTemp = rowTemp.createCell(columnIndex++);
cellTemp.setCellValue(child);
}

char lastChildrenColumn = (char) ((int) 'A' + children.size() - 1);
createName(workbook, parentTemp, String.format(hiddenSheetName + "!$A$%s:$%s$%s", rowIndex, lastChildrenColumn, rowIndex));
constraintTemp = DVConstraint.createFormulaListConstraint("INDIRECT($" + keyColumn + "1)");
CellRangeAddressList regions = new CellRangeAddressList(fromRow, endRow, valueColumn - 'A', valueColumn - 'A');
targetSheet.addValidationData(new HSSFDataValidation(regions, constraintTemp));
}
addValidationToSheet(workbook, targetSheet, firstLevelItems.toArray(), keyColumn, fromRow, endRow);
}

/**
* 根据用户在keyColumn选择的key, 自动填充value到valueColumn
*
* @param workbook excel
* @param targetSheet sheet页
* @param keyValues 匹配关系 {'百度','www.baidu.com'},{'淘宝','www.taobao.com'}
* @param keyColumn 自动填充 要匹配的列(例如 网站中文名称)
* @param valueColumn 匹配到的内容列(例如 网址)
* @param fromRow 下拉限制开始行
* @param endRow 下拉限制结束行
*/
public static void addAutoMatchValidationToSheet(Workbook workbook, Sheet targetSheet, Map<String, String> keyValues, char keyColumn, char valueColumn, int fromRow, int endRow) {
String hiddenSheetName = "sheet" + workbook.getNumberOfSheets();
Sheet hiddenSheet = workbook.createSheet(hiddenSheetName);

Cell cellTemp;
Row rowTemp;
// init the search region(A and B columns in hiddenSheet)
int rowIndex = 0;
for (Map.Entry<String, String> kv : keyValues.entrySet()) {
rowTemp = hiddenSheet.createRow(rowIndex++);

cellTemp = rowTemp.createCell(0);
cellTemp.setCellValue(kv.getKey());

cellTemp = rowTemp.createCell(1);
cellTemp.setCellValue(kv.getValue());
}
String keyCellTemp;
String formulaTemp;
for (int i = fromRow; i <= endRow; i++) {
Row totalSheetRow = targetSheet.getRow(i);
if (totalSheetRow == null) {
totalSheetRow = targetSheet.createRow(i);
}
cellTemp = totalSheetRow.getCell((int) valueColumn - 'A');
if (cellTemp == null) {
cellTemp = totalSheetRow.createCell((int) valueColumn - 'A');
}
keyCellTemp = String.valueOf(keyColumn) + (i + 1);
formulaTemp = String.format("IF(ISNA(VLOOKUP(%s,%s!A:B,2,0)),\"\",VLOOKUP(%s,%s!A:B,2,0))", keyCellTemp, hiddenSheetName, keyCellTemp, hiddenSheetName);
cellTemp.setCellFormula(formulaTemp);
}
// init the keyColumn as comboList
addValidationToSheet(workbook, targetSheet, keyValues.keySet().toArray(), keyColumn, fromRow, endRow);
}

private static Name createName(Workbook workbook, String nameName, String formula) {
Name name = workbook.createName();
name.setNameName(nameName);
name.setRefersToFormula(formula);
return name;
}

/**
* 隐藏excel中的sheet页
*
* @param workbook
* @param start 需要隐藏的 sheet开始索引
*/
private static void hideTempDataSheet(HSSFWorkbook workbook, int start) {
for (int i = start; i < workbook.getNumberOfSheets(); i++) {
workbook.setSheetHidden(i, true);
}
}

/**
* 不可数字开头
*
* @param name
* @return
*/
static String formatNameName(String name) {
name = name.replaceAll(" ", "").replaceAll("-", "_").replaceAll(":", ".");
if (Character.isDigit(name.charAt(0))) {
name = "_" + name;
}

return name;
}

/**
* poi
* 测试
*
* @param args
* @throws IOExceptionjava
*/
public static void main(String[] args) throws IOException {

//demo 单独下拉列表
//BootExcelUtil.addValidationToSheet(workbook, sheet, new String[]{"百度", "阿里巴巴"}, 'C', 1, 200);
//demo 级联下拉列表,一级不需要设置单独下拉
Map<String, List<String>> data = new HashMap<>();
data.put("百度系列", Arrays.asList("百度地图", "百度知道", "百度音乐"));
data.put("阿里系列", Arrays.asList("淘宝", "支付宝", "钉钉"));
//BootExcelUtil.addValidationToSheet(workbook, sheet, data, 'A', 'B', 1, 200);
//demo 自动填充
Map<String, String> kvs = new HashMap<>();
kvs.put("百度", "www.baidu.com");
kvs.put("阿里", "www.taobao.com");
//BootExcelUtil.addAutoMatchValidationToSheet(workbook, sheet, kvs, 'D', 'E', 1, 200);
}
}

本文地址: https://github.com/maxzhao-it/blog/post/1504/

pacman升级时,有不少软件不需要升级或是升级后不稳定,可以用下面方式忽略升级

忽略升级

/etc/pacman.conf 中查找 #IgnorePkg =

后面添加不想升级的软件包

  1. IgnorePkg 软件包名称
  2. IgnoreGroup 软件包组名称

常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
pacman -Sy abc         #和源同步后安装名为abc的包
pacman -S abc #从本地数据库中得到abc的信息,下载安装abc包
pacman -Sf abc #强制安装包abc
pacman -Ss abc #搜索有关abc信息的包
pacman -Si abc #从数据库中搜索包abc的信息
pacman -Q # 列出已经安装的软件包
pacman -Q abc # 检查 abc 软件包是否已经安装
pacman -Qi abc #列出已安装的包abc的详细信息
pacman -Ql abc # 列出abc软件包的所有文件
pacman -Qo /path/to/abc # 列出abc文件所属的软件包
pacman -Syu #同步源,并更新系统
pacman -Sy #仅同步源
pacman -Su #更新系统
pacman -R abc #删除abc包
pacman -Rd abc #强制删除被依赖的包
pacman -Rc abc #删除abc包和依赖abc的包
pacman -Rsc abc #删除abc包和abc依赖的包
pacman -Sc #清理/var/cache/pacman/pkg目录下的旧包
pacman -Scc #清除所有下载的包和数据库
pacman -U abc #安装下载的abs包,或新编译的abc包
pacman -Sd abc #忽略依赖性问题,安装包abc
pacman -Su --ignore foo #升级时不升级包foo
pacman -Sg abc #查询abc这个包组包含的软件包

本文地址: https://github.com/maxzhao-it/blog/post/26523/

Docker安装可以参考:

Manjaro安装Docker

Centos7 安装 docker

一、创建 docker 节点

拉取centos镜像

1
docker pull centos

这里如果没有权限,则参考 Manjaro安装Docker 中的分组操作。

查看镜像

1
docker images

创建几个容器,作为greenplum的节点

1
2
3
4
5
# 用 exit退出
docker run -it --name gp-master centos /bin/bash
docker run -it --name gp-master1 centos /bin/bash
docker run -it --name gp-master2 centos /bin/bash
docker run -it --name gp-master3 centos /bin/bash

注意:exit 退出后,容器停止(Exited)

命令解释:为centos这个镜像创建一个容器

  • run: 在新的容器中运行命令 . run=create + start
  • -it : -i-t, 为该docker创建一个伪终端,这样就可以进入到容器的交互模式
  • --name gp-master 容器名称
  • centos: 镜像
  • */bin/bash :表示启动容器后启动bashdocker中必须要保持一个进程的运行,要不然整个容器启动后就会马上kill itself

查看帮助:docker run --help

守护态运行,通过 run后加-d实现

二、配置基础环境

进入每个greenplum节点,配置基础环境

由于dockercentos镜像是centos的简化版本,里面有很多包是没有安装的,会影响到后面部署greenplum,因此在docker的每个节点中安装相关的依赖包

1
2
3
4
5
6
7
8
9
10
11
12
13
# 查看正在运行的容器
docker ps -s
# 查看所有容器
docker ps -a
# 启动
docker start gp-master
docker start gp-master1
docker start gp-master2
docker start gp-master3
# 停止容器
# docker stop gp-master
# 进入容器
docker exec -it gp-master /bin/bash
  • 安装相关的依赖包
1
yum install -y net-tools which openssh-clients openssh-server less zip unzip iproute
  • 启动ssh

docker中默认没有启动ssh,为了方便各节点之间的互连,创建相关的认证key,并启动docker的每个节点里面的ssh

1
2
3
4
ssh-keygen -t rsa -f /etc/ssh/ssh_host_rsa_key
ssh-keygen -t ecdsa -f /etc/ssh/ssh_host_ecdsa_key
ssh-keygen -t ed25519 -f /etc/ssh/ssh_host_ed25519_key
/usr/sbin/sshd
  • 修改/etc/hosts文件

在每个docker节点中添加如下配置,方便后续greenplum集群的配置文件中用到,ip为各个docker节点中的ip地址

1
2
3
4
5
6
7
# 查看
cat /etc/hosts
# 写入
echo "172.17.0.2 dw-greenplum-1 mdw" >> /etc/hosts
echo "172.17.0.3 dw-greenplum-2 sdw1" >> /etc/hosts
echo "172.17.0.4 dw-greenplum-3 sdw2" >> /etc/hosts
echo "172.17.0.5 dw-greenplum-4 sdw3" >> /etc/hosts

同时修改所有节点里面的/etc/sysconfig/network文件,保持与主机名一致

1
2
3
4
5
6
7
8
9
# 查看
cat /etc/sysconfig/network
# 不存在,则写入
echo "NETWORKING=yes" >> /etc/sysconfig/network
echo "HOSTNAME=mdw" >> /etc/sysconfig/network
# 存在,则修改
vi /etc/sysconfig/network
NETWORKING=yes
HOSTNAME=mdw
  • 创建greenplum的用户和用户组

为了方便安装greenplum集群,且使greenplum自带的python不与系统的python版本相冲突,在每个节点中创建greenplum的用户和用户组

1
2
3
4
5
groupadd -g 530 gpadmin
useradd -g 530 -u 530 -m -d /home/gpadmin -s /bin/bash gpadmin
chown -R gpadmin:gpadmin /home/gpadmin
# 这里可能会提示 bash: passwd: command not found
passwd gpadmin
  • 修改每个节点上的文件打开数量限制
1
2
3
4
5
6
cat /etc/security/limits.conf
# 不存在,则写入, 存在,则用 vi 修改
echo "soft nofile 65536" >> /etc/security/limits.conf
echo "hard nofile 65536" >> /etc/security/limits.conf
echo "soft nproc 131072" >> /etc/security/limits.conf
echo "hard nproc 131072" >> /etc/security/limits.conf
  • 关闭每个节点上的防火墙,关闭selinux
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 最新的centos好像这些都不需要操作
service iptables stop
chkconfig iptables off
vi /etc/selinux/config
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
# targeted - Targeted processes are protected,
# mls - Multi Level Security protection.
SELINUXTYPE=targeted

三、下载greenplum安装包

greenplum的官网上,下载greenplum安装包
GitHub下载,点开Greenplum Database Server,根据自己的操作系统下载安装包,我下载当前最新的 greenplum-db-6.15.0-rhel7-x86_64.rpm,将其拷到master节点的/home/gpadmin目录中

版本快速链接: github:open-source-greenplum-db-6.15.0-rhel7-x86_64.rpm

四、在master节点上安装greenplum

切换到gpadmin用户

1
2
su gpadmin
cd ~/

传递文件到docker

1
2
docker ps 
docker cp /home/maxzhao/Downloads/open-source-greenplum-db-6.15.0-rhel7-x86_64.rpm 348d61c65324:/home/gpadmin/open-source-greenplum-db-6.15.0-rhel7-x86_64.rpm

Rpm 安装

1
yum localinstall /home/gpadmin/open-source-greenplum-db-6.15.0-rhel7-x86_64.rpm -y
1
2
3
4
5
# 查看安装地址
[root@3b2ae1fbe58b /]# whereis greenplum-db
greenplum-db: /usr/local/greenplum-db
[root@3b2ae1fbe58b /]# ls -l /usr/local/greenplum-db
lrwxrwxrwx 1 root root 30 Apr 20 11:14 /usr/local/greenplum-db -> /usr/local/greenplum-db-6.15.0

gpssh-exkeys 报错问题:

cd /usr/bin

mv python python.bak

解压安装(ZIP)

解压下载后的zip文件

1
unzip greenplum-db-5.10.2-rhel7-x86_64.zip

执行安装文件

1
./greenplum-db-5.10.2-rhel7-x86_64.bin

安装期间需要配置安装目录,输入/home/gpadmin/greenplum-db-5.10.2

为了方便安装集群,greenplum提供了批量操作节点的命令,通过指定配置文件使用批处理命令

1
2
3
4
5
6
7
8
9
10
11
cd ~/ ;mkdir conf
echo mdw > ./conf/hostlist
echo sdw1 >> ./conf/hostlist
echo sdw2 >> ./conf/hostlist
echo sdw3 >> ./conf/hostlist
cat ./conf/hostlist

echo sdw1 > ./conf/seg_hosts
echo sdw2 >> ./conf/seg_hosts
echo sdw3 >> ./conf/seg_hosts
cat ./conf/seg_hosts

greenplum-db/greenplum_path.sh中保存了运行greenplum的一些环境变量,包括GPHOME、PYTHONHOME等,在gpadmin账号下设置环境变量,并将master节点的key
交换到各个segment节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[gpadmin@mdw ~]$ source /home/gpadmin/greenplum-db/greenplum_path.sh 
# 或者
[gpadmin@mdw ~]$ source /usr/local/greenplum-db/greenplum_path.sh
[gpadmin@mdw ~]$ gpssh-exkeys -f /home/gpadmin/conf/hostlist
[STEP 1 of 5] create local ID and authorize on local host

[STEP 2 of 5] keyscan all hosts and update known_hosts file

[STEP 3 of 5] authorize current user on remote hosts
... send to mdw
... send to sdw1
***
*** Enter password for sdw1:
... send to sdw2
... send to sdw3

[STEP 4 of 5] determine common authentication file content

[STEP 5 of 5] copy authentication files to all remote hosts
... finished key exchange with mdw
... finished key exchange with sdw1
... finished key exchange with sdw2
... finished key exchange with sdw3

[INFO** completed successfully

交换成功后,后续就可以使用一些命令执行批量操作

注意:使用gpssh-exkeys命令时一定要使用gpadmin用户,因为会在/home/gpadmin/.ssh中生成ssh的免密码登录秘钥,如果使用其它账号登录,则会在其它账号下生成密钥,在gpadmin
账号下就无法使用gpssh的批处理命令

1
2
3
4
5
6
7
8
9
10
11
12
[gpadmin@mdw ~]$ gpssh -f /home/gpadmin/conf/hostlist
=> pwd
[sdw1] /home/gpadmin
[sdw3] /home/gpadmin
[ mdw] /home/gpadmin
[sdw2] /home/gpadmin
=> ls
[sdw1]
[sdw3]
[ mdw] conf greenplum-db greenplum-db-5.10.2
[sdw2]
=> exit

pwd命令是linux中的查看路径命令,在这里也是查看批量操作时各个节点当前所在的路径,从中可以看到已经成功连通了4个节点

五、分发安装包到每个子节点

打包master节点上的安装包

1
[gpadmin@mdw ~]$ tar -czf gp.tar.gz greenplum-db-5.10.2

使用gpscp命令将这个文件复制到每个子节点

1
[gpadmin@mdw ~]$ gpscp -f /home/gpadmin/conf/seg_hosts gp.tar.gz =:/home/gpadmin

批量解压,并创建软链接

1
2
3
4
5
6
7
8
9
[gpadmin@mdw ~]$ gpssh -f /home/gpadmin/conf/seg_hosts
=> tar -zxf gp.tar.gz
[sdw3]
[sdw1]
[sdw2]
=> ln -s greenplum-db-5.10.2 greenplum-db
[sdw3]
[sdw2]
[sdw1]

这样就完成了所有子节点数据库的安装

六、初始化安装数据库

  • 批量创建数据目录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[gpadmin@mdw ~]$ gpssh -f /home/gpadmin/conf/hostlist
=> mkdir gpdata
[sdw1]
[sdw3]
[ mdw]
[sdw2]
=> cd gpdata
[sdw1]
[sdw3]
[ mdw]
[sdw2]
=> mkdir gpmaster gpdatap1 gpdatap2 gpdatam1 gpdatam2
[sdw1]
[sdw3]
[ mdw]
[sdw2]
=> exit
  • 在master节点上修改.bash_profile配置环境变量,并发送给其他子节点,确保这些环境变量生效
1
2
3
4
5
6
7
8
9
10
[gpadmin@mdw ~]$ vi .bash_profile 
source /opt/gpadmin/greenplum-db/greenplum_path.sh
export MASTER_DATA_DIRECTORY=/home/gpadmin/gpdata/gpmaster/gpseg-1
export PGPORT=2345
export PGDATABASE=testDB
[gpadmin@mdw ~]$ source .bash_profile
[gpadmin@mdw ~]$ gpscp -f /home/gpadmin/conf/seg_hosts /home/gpadmin/.bash_profile
[gpadmin@sdw1 ~]$ source .bash_profile
[gpadmin@sdw2 ~]$ source .bash_profile
[gpadmin@sdw3 ~]$ source .bash_profile
  • 初始化配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[gpadmin@mdw ~]$ vi /home/gpadmin/conf/gpinitsystem_config
ARRAY_NAME="Greenplum"
MACHINE_LIST_FILE=/home/gpadmin/conf/seg_hosts

# Segment 的名称前缀
SEG_PREFIX=gpseg
# Primary Segment 起始的端口号
PORT_BASE=33000
# 指定 Primary Segment 的数据目录
declare -a DATA_DIRECTORY=(/home/gpadmin/gpdata/gpdatap1 /home/gpadmin/gpdata/gpdatap2)
# Master 所在机器的 Hostname
MASTER_HOSTNAME=mdw
# 指定 Master 的数据目录
MASTER_DIRECTORY=/home/gpadmin/gpdata/gpmaster
# Master 的端口
MASTER_PORT=2345
# 指定Bash的版本
TRUSTED_SHELL=/usr/bin/ssh
# Mirror Segment起始的端口号
MIRROR_PORT_BASE=43000
# Primary Segment 主备同步的起始端口号
REPLICATION_PORT_BASE=34000
# Mirror Segment 主备同步的起始端口号
MIRROR_REPLICATION_PORT_BASE=44000
# Mirror Segment 的数据目录
declare -a MIRROR_DATA_DIRECTORY=(/home/gpadmin/gpdata/gpdatam1 /home/gpadmin/gpdata/gpdatam2)
  • 初始化数据库
1
[gpadmin@mdw ~]$ gpinitsystem -c /home/gpadmin/conf/gpinitsystem_config -s sdw3

其中,-s sdw3是指配置master的standby节点,然后按照提示步骤就能完成安装了

如果gpinitsystem不成功,在master节点的/home/gpadmin/gpAdminLogs目录下gpinitsystem_*
.log文件中查看日志信息,找出原因进行修改,然后再重新执行gpinitsystem进行初始化安装。

本文地址: https://github.com/maxzhao-it/blog/post/47598/

Citus简介

Citus以插件的方式扩展到postgresql中,独立于postgresql内核,所以能很快的跟上pg主版本的更新,部署也比较简单,是现在非常流行的分布式方案。Citus在苏宁有大规模应用,微软也提供citus的商业支持。下面是citus的架构:

2018042723003371

Citus
节点主要分为协调节点和工作节点,协调节点不存储真实数据,只存储数据分布的元信息,实际的数据被分成若干分片,打散到不同worker节点中,应用连接协调节点,协调节点进行sql解析,生成分布式执行计划,下发到worker节点执行,cn将结果汇总返回客户端。

Citus 的主要架构特点如下:

①有两种表类型:参考表分布表,参考表每个协调节点和worker节点都有一份完整的副本,分布表则会打散分布到不同worker中。

②可以进行读写分离,如上图cn1为写节点,可以通过再增加多个cn读节点增加集群读的能力,写cn和读cn之间使用流复制进行元数据同步

③支持MX模式,可以将元数据也存在某些worker节点中,这样使得该worker节点能够直接提供写的能力,以此增加集群写的能力。

④底层worker节点可以通过流复制搭建副本,保证数据高可用。

⑤做join时最好的结果是能够将计算下推到worker节点,但是只有在参考表和其他表做join以及两个表的分布方式相同的情况下才能下推到worker计算,否则需要将数据拉到协调节点进行计算。

⑥整体架构类似mycat的中间件,因为没有全局事务管理,故不能保证数据的实时读一致性,但是性能上相比要好。数据写一致性使用2pc来保证。

来自 PostgreSQL的几种分布式架构对比

其它特性:

● PostgreSQL兼容

● 水平扩展

● 实时并发查

● 快速数据加载

● 实时增删改查

● 持分布式事务

● 支持常用DDL

交互

  1. 客户端应用访问数据时只和CN节点交互。
  2. CN收到SQL请求后,生成分布式执行计划,并将各个子任务下发到相应的Worker节点,之后收集Worker的结果,经过处理后返回最终结果给客户端。

Citus 性能参考(来自互联网)

为了能够直观的了解citus分片表的性能优势,下面在1个CN和8个worker组成citus集群上, 对比普通表和分片表(96分片)的性能差异。

img

参考表:分片表主要解决的是大表的水平扩容问题,对数据量不是特别大又经常需要和分片表Join的维表可以采用一种特殊的分片策略,只分1个片且每个Worker上部署1个副本,这样的表叫做“参考表”。

推荐的业务场景

1、实时数据分析

itus不仅支持高速的批量数据加载(20w/s),还支持单条记录的实时增删改查。
查询数据时,CN对每一个涉及的分片开一个连接驱动所有相关worker同时工作。并且支持过滤,投影,聚合,join等常见算子的下推,尽可能减少CN的负载。所以,对于count(),sum()
这类简单的聚合计算,在128分片时citus可以轻松获得和PostgreSQL单并发相比50倍以上的性能提升。

2、多租户

和很多分布式数据库类似,citus对分片表间join的支持存在一定的限制。而多租户场景下每个租户的数据按租户ID分片,业务的SQL也带租户ID。因此这些SQL都可以直接下推到特定的分片上,避免了跨库join和跨库事务。

多租户定义:多租户技术或称多重租赁技术,简称SaaS,是一种软件架构技术,是实现如何在多用户环境下(此处的多用户一般是面向企业用户)共用相同的系统或程序组件,并且可确保各用户间数据的隔离性
。简单讲:在一台服务器上运行单个应用实例,它为多个租户(客户)提供服务。从定义中我们可以理解:多租户是一种架构,目的是为了让多用户环境下使用同一套程序,且保证用户间数据隔离。那么重点就很浅显易懂了,**
多租户的重点就是同一套程序下实现多用户数据的隔离**。

本文地址: https://github.com/maxzhao-it/blog/post/40115/

Greenplum简介

官方

全球首个开源、多云、并行 大数据平台

来自网页

Greenplum是pivotal公司推出的一款开源olap的mpp数据库,greenplum的用户在某种程度上甚至超越了pg,很多人可能是通过greenplum才认识的pg,可见greenplum的风靡。下面是greenplum架构:

2018042723003371

Master节点存储全局系统元数据信息,不存储真实数据。数据通过hash分布到不同的segment中,master作为sql的全局入口,负责在segment中分配工作负载,整合处理结果,返回客户端。

Greenplum架构特点如下:

①master节点可以做主备,segment节点也有镜像保证高可用,segment主备尽量混布到不同服务器上。

②支持行列混合存储引擎,同时支持外部表。

③在join时也涉及到数据跨节点重分布的问题,这也是share nothing数据库不可避免的问题。

④高速内部interconnect网络,实现数据join时的高速移动和汇总。

⑤高效的数据并行加载。

来自 PostgreSQL的几种分布式架构对比

了解 share nothing

  • Shared Everthting:一般是针对单个主机,完全透明共享CPU/MEMORY/IO,并行处理能力差,典型的代表SQLServer。 shared-everything架构优点很明显,但是网络,硬盘很容易就会成为系统瓶颈。

  • Shared Disk:各个处理单元使用自己的私有 CPU和Memory,共享磁盘系统。典型的代表Oracle Rac,
    它是数据共享,可通过增加节点来提高并行处理的能力,扩展能力较好。其类似于SMP(对称多处理)模式,但是当存储器接口达到饱和的时候,增加节点并不能获得更高的性能 。

  • Shared Nothing
    :各个处理单元都有自己私有的CPU/内存/硬盘等,不存在共享资源,各处理单元之间通过协议通信,并行处理和扩展能力更好。各节点相互独立,各自处理自己的数据,处理后的结果可能向上层汇总或在节点间流转。Share-Nothing架构在扩展性和成本上都具有明显优势。

了解 MPP

大规模并行处理系统是由许多松耦合处理单元组成的,借助MPP这种高性能的系统架构,Greenplum可以将TB级的数据仓库负载分解,并使用所有的系统资源并行处理单个查询。

本文地址: https://github.com/maxzhao-it/blog/post/34819/

这里使用 SpringBoot 2.4.2 + kafka 2.7.0

开发配置测试

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<!-- Springboot 建议使用 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 其它使用 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.6.1</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>

配置

Kafka在属性文件格式中使用键值对进行配置。这些值可以通过文件或编程方式提供。 必备配置如下:

  1. broker.id=0
  2. log.dirs=/tmp/kafka-logs
  3. zookeeper.connect=hostname1:port1,hostname2:port2,hostname3:port3
  4. auto.create.topics.enable=true 是否允许在服务器上自动创建topic

application.yml 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
spring
kafka:
# Kafka集群
bootstrap-servers: 127.0.0.1:9092
producer:
# 重试次数
retries: 0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
acks: 1
# 批量大小
batch-size: 10240
# 提交延时
properties.linger.ms: 0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
buffer-memory: 33554432
# Kafka提供的序列化和反序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class: com.felix.kafka.producer.CustomizePartitioner
# 初始化消费者配置
# 默认的消费组ID
consumer:
properties:
group:
id: defaultConsumerGroup
# 是否自动提交offset
enable-auto-commit: true
# 提交offset延时(接收到消息后多久提交offset)
auto:
commit:
interval:
ms: 1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
auto-offset-reset: latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
properties.session.timeout.ms: 120000
# 消费请求超时时间
properties.request.timeout.ms: 180000
# Kafka提供的序列化和反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 批量消费每次最多消费多少条消息
# max-poll-records: 50
listener:
# 消费端监听的topic不存在时,项目启动会报错(关掉)
missing-topics-fatal: false
# 设置批量消费
# type: batch

创建 topic 的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 初始化默认 kafka 分区
*
* @author maxzhao
* @date 2021-01-26 14:37
*/
@Configuration
public class DefaultTestKafkaInitialConfiguration {
public static final String TEST_TOPIC = "test-topic";

/**
* 创建一个 test-topic 的 Topic,设置分区数为 1,分区副本数为 1
* <p>修改分区只需要重新配置分区数、分区副本数,然后重启(数量只能增大)</p>
*/
@Bean
public NewTopic initialTopic() {
return new NewTopic(TEST_TOPIC, 1, (short) 1);
}
}

简单的生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface IKafkaService {
/**
* 发送消息
*
* @param topic 主题
* @param message 消息
*/
void sendMessage(String topic, String message);

}

@Slf4j
@Service
public class KafkaServiceImpl implements IKafkaService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@Override
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}

简单的消费者

1
2
3
4
5
6
7
8
9
10
11
12

@Slf4j
@Component
public class KafkaConsumer {
/**
* 消费监听
*/
@KafkaListener(topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC})
public void onMessage1(ConsumerRecord<?, ?> record) {
log.debug("消费:{}-{}-{}", record.topic(), record.partition(), record.value());
}
}

简单测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = start.DemoApplication.class)
public class KafkaTest {
@Autowired
private IKafkaService kafkaService;

/**
* 测试消息发送
*/
@Test
public void sendMessage() {
String message = "这是一个简单的消息!";
log.debug("发送消息:{}-{}", DefaultTestKafkaInitialConfiguration.TEST_TOPIC, message);
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC, message);
log.debug("发送完成");
}
}

输出结果

1
2
3
2021-01-26 15:40:33 DEBUG (KafkaTest.java:31)- 发送消息:test-topic-这是一个简单的消息!
2021-01-26 15:49:32 DEBUG (KafkaTest.java:33)- 发送完成
2021-01-26 15:49:48 DEBUG (KafkaConsumer.java:24)- 消费:test-topic-0-这是一个简单的消息!

带回调的生产者

kafkaTemplate 提供了一个回调方法addCallback,可以在回调方法中监控消息是否发送成功,或失败时做补偿处理 有集中写法,这里简单的介绍两种 lambda和接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

@Slf4j
@Service("kafkaService")
public class KafkaServiceImpl implements IKafkaService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@Override
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message).addCallback(success -> {
if (success == null) {
log.debug("消息发送成功,响应数据不存在");
} else {
RecordMetadata recordMetadata = success.getRecordMetadata();
/*消息发送到的topic,消息发送到的分区 partition,消息在分区内的offset*/
log.debug("消息发送成功:topic:{} partition:{} offset:{}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}
}, throwable -> {
log.warn("消息发送失败:topic:{} message{} error:{}", topic, message, throwable.getMessage());
});

kafkaTemplate.send(topic, message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(SendResult<String, Object> success) {
if (success == null) {
log.debug("消息发送成功,响应数据不存在");
} else {
RecordMetadata recordMetadata = success.getRecordMetadata();
/*消息发送到的topic,消息发送到的分区 partition,消息在分区内的offset*/
log.debug("消息发送成功:topic:{} partition:{} offset:{}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}
}

@Override
public void onFailure(Throwable throwable) {
log.warn("消息发送失败:topic:{} message{} error:{}", topic, message, throwable.getMessage());
}
});

}
}

自定义分区器

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

  1. 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
  2. 若发送消息时未指定 partition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
  3. partition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 partition

※ 我们来自定义一个分区策略,将消息发送到我们指定的 partition,首先新建一个分区器类实现 Partitioner 接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 自定义分区规则
// ......
// 0 则是默认发到 0号
return 0;
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> configs) {
}
}

application.yml中配置自定义分区器,配置的值就是分区器类的全路径名,

1
2
# 自定义分区器
spring.kafka.producer.properties.partitioner.class=gt.maxzhao.mq.config.CustomPartitioner

kafka事务提交

如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

@Slf4j
@Service("kafkaService")
public class KafkaServiceImpl implements IKafkaService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@Override
public void sendMessageWithTransaction(String topic, String message) {
kafkaTemplate.executeInTransaction(operations -> {
operations.send(topic, message).addCallback(success -> {
/*没有发出去,就没有响应*/
}, throwable -> {
/*没有发出去,就没有响应*/
});
/*这里抛出错误,则不会发出消息*/
throw new RuntimeException();
});
}
}

消费者测试

订阅

订阅 topic 是以组的形式进行的

  • 同一组的同一个partition 只能被消费一次。
  • 不同组可以共同消费同一个 partition
  • 同一组的监听数量大于 partition,那么一定有监听空闲。

简单消费

1、指定topic、partition、offset消费

前面监听消费 topic 的时候,监听的是 topic 上所有的消息,如果想指定指定partition、指定offset来消费,直接配置@KafkaListener注解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

@Slf4j
@Component
public class KafkaConsumer {
/**
* 消费监听
* <p>可以监听多个</p>
*/
@KafkaListener(topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC})
public void onMessage1(ConsumerRecord<?, ?> record) {
log.debug("消费:{}-{}-{}", record.topic(), record.partition(), record.value());
}

/**
* 指定topic、partition、offset消费
* <p>同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8</p>
**/
@KafkaListener(id = "consumer1", groupId = "consumer-group",
topicPartitions = {
@TopicPartition(topic = DefaultTestKafkaInitialConfiguration.TEST_TOPIC, partitions = {"0"}),
@TopicPartition(topic = DefaultTestKafkaInitialConfiguration.TEST_TOPIC_2, partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?, ?> record) {
log.debug("消费2:topic:{} partition:{} value:{} offset:{}", record.topic(), record.partition(), record.value(), record.offset());
}
}

onMessage2监听的含义:监听 TEST_TOPIC 的0号分区,同时监听 TEST_TOPIC_2 的0号分区和 TEST_TOPIC_2 的1号分区里面 offset 从8开始的消息。

属性解释:

  • id:消费者ID;
  • groupId:消费组ID;
  • topics:监听的 topic,可监听多个;
  • topicPartitions:可配置更加详细的监听信息,可指定 topicpartitionoffset 监听。
  • 注意:topicstopicPartitions 不能同时使用;

最终结果会发现,onMessage1onMessage2都可以消费 TEST_TOPIC 的消息。

批量消费

设置 application.yml 开启批量消费即可,

1
2
3
4
5
6
# 设置批量消费
spring.kafka.listener.type: batch
# 批量消费每次最多消费50条消息
spring.kafka.consumer.max-poll-records: 50
# 调整延时时间,否则无法一次接收多个信息
spring.kafka.producer.properties.linger.ms: 50

接收消息时用List来接收,监听代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

@Slf4j
@Component
public class KafkaConsumer {
/******************************************************************/
/**
* 批量消费
**/
@KafkaListener(id = "consumer2", groupId = "consumer-group",
topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_3})
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
for (ConsumerRecord<?, ?> record : records) {
log.debug("批量消费:topic:{} partition:{} value:{} offset:{}", record.topic(), record.partition(), record.value(), record.offset());
}
}
}

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = start.DemoApplication.class)
public class KafkaTest {
@Autowired
private IKafkaService kafkaService;

/**
* 测试发送多个消息发送
*/
@Test
public void sendMessage3() {
String message = "这是一个简单的消息!";
String message2 = "这是一个简单的消息!_2";
String message3 = "这是一个简单的消息!_3";
String message4 = "这是一个简单的消息!_4";
log.debug("发送消息:{}-{}", DefaultTestKafkaInitialConfiguration.TEST_TOPIC_3, message);
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_3, message);
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_3, message2);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_3, message3);
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_3, message4);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
log.debug("发送完成");
}
}

输出结果为

1
2
3
4
2021-01-26 20:23:20 DEBUG (KafkaConsumer.java:52)- 批量消费:topic:test-topic_3 partition:0 value:这是一个简单的消息! offset:10
2021-01-26 20:23:20 DEBUG (KafkaConsumer.java:52)- 批量消费:topic:test-topic_3 partition:0 value:这是一个简单的消息!_2 offset:11
2021-01-26 20:23:22 DEBUG (KafkaConsumer.java:52)- 批量消费:topic:test-topic_3 partition:0 value:这是一个简单的消息!_3 offset:12
2021-01-26 20:23:22 DEBUG (KafkaConsumer.java:52)- 批量消费:topic:test-topic_3 partition:0 value:这是一个简单的消息!_4 offset:13

ConsumerAwareListenerErrorHandler 异常处理器

通过异常处理器,我们可以处理consumer在消费时发生的异常。 新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用 @Bean 注入, 然后我们将这个异常处理器的 Bean
放到 @KafkaListener 注解的 errorHandler 属性里面,当监听抛出异常的时候,则会自动调用异常处理器,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

@Slf4j
@Component
public class KafkaConsumer {
/********************************************************/
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return (message, exception, consumer) -> {
log.debug("批量消费异常:topic:{} message:{}",
consumer.listTopics(),
message.getPayload());
return null;
};
}

/**
* 批量消费
**/
@KafkaListener(id = "consumer4", groupId = "consumer-group",
topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_4},
errorHandler = "consumerAwareErrorHandler")
public void onMessage4(List<ConsumerRecord<?, ?>> records) throws Exception {
throw new Exception("批量消费-模拟异常");
}

}

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = start.DemoApplication.class)
public class KafkaTest {
/**
* 测试异常处理
*/
@Test
public void sendMessage4() {
String message = "这是一个简单的消息_4!";
log.debug("发送消息:{}-{}", DefaultTestKafkaInitialConfiguration.TEST_TOPIC_4, message);
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_4, message);
log.debug("发送完成");
}
}

执行看一下效果,

1
2021-01-26 20:36:34 DEBUG (KafkaConsumer.java:61)- 批量消费异常:topic:{test-topic_2=[Partition(topic = test-topic_2, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])], test=[Partition(topic =******************************serialized key size = -1, serialized value size = 32, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 这是一个简单的消息_4!)]

消息过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由 KafkaListener 处理,不需要的消息则过滤掉。 配置消息过滤只需要为 监听器工厂
配置一个 RecordFilterStrategy(消息过滤策略),返回 true 的时候消息将会被抛弃,返回 false 时,消息能正常抵达监听容器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

@Slf4j
@Component
public class KafkaConsumer {
/********************************************************/

@Autowired
private ConsumerFactory consumerFactory;

/**
* 消息过滤器
*/
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);
// 消息过滤策略
factory.setRecordFilterStrategy(consumerRecord -> {
if (consumerRecord.value().toString().length() > 20) {
return false;
}
//返回true消息则被过滤
return true;
});
return factory;
}

/**
* 消息过滤
*/
@KafkaListener(id = "consumer5", groupId = "consumer-group",
topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_5},
containerFactory = "filterContainerFactory")
public void onMessage5(String record) {
log.debug("批量消费-拦截长度小于20的字符串: value:{} ", record);
}
}

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = start.DemoApplication.class)
public class KafkaTest {
/**
* 消息过滤
* <p>需要吧提交延时改为0</p>
*/
@Test
public void sendMessage5() {
String message = "这是一个简单的消息_5!";
String message2 = "这是一个简单的消息_5!__________________________________________________";
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_5, message);
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_5, message2);
log.debug("发送完成");
}
}

输出结果

1
2
3
4
2021-01-26 21:03:10 DEBUG (KafkaTest.java:96)- 发送完成
2021-01-26 21:03:10 DEBUG (KafkaServiceImpl.java:33)- 消息发送成功:topic:test-topic_5 partition:0 offset:26
2021-01-26 21:03:10 DEBUG (KafkaConsumer.java:112)- 批量消费-拦截长度小于20的字符串: value:这是一个简单的消息_5!__________________________________________________
2021-01-26 21:03:10 DEBUG (KafkaServiceImpl.java:33)- 消息发送成功:topic:test-topic_5 partition:0 offset:27

消息转发

消息转发在实际开发中,应用A从TopicA 获取到消息,经过处理后转发到TopicB, 再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。
SpringBoot集成Kafka实现消息的转发,只需要通过一个@SendTo 注解,被注解方法的return值即转发的消息内容,如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

@Slf4j
@Component
public class KafkaConsumer {
/**
* 消息转发给 5
*/
@KafkaListener(id = "consumer6", groupId = "consumer-group",
topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_6})
@SendTo(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_5)
public String onMessage6(List<ConsumerRecord<?, ?>> records) {
log.debug("消息转发 6:topic:{} partition:{} value:{} offset:{}",
records.get(0).topic(),
records.get(0).partition(),
records.get(0).value(),
records.get(0).offset());
return "消息转发:" + records.get(0).value().toString();
}
}

输出结果,如果结果不同,可以查看自己设置的延时时间及配置

1
2
3
2021-01-30 14:26:29 DEBUG (KafkaConsumer.java:122)- 消息转发 6:topic:test-topic_6 partition:0 value:这是一个简单的消息_6! offset:10
2021-01-30 14:26:29 DEBUG (KafkaConsumer.java:122)- 消息转发 6:topic:test-topic_6 partition:0 value:这是一个简单的消息_6!———————————————————————————————————————————————— offset:11
2021-01-30 14:26:29 DEBUG (KafkaConsumer.java:112)- 批量消费-拦截长度小于20的字符串: value:消息转发:这是一个简单的消息_6!————————————————————————————————————————————————

消费监听的起停

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息, 那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,
或者在我们指定的时间点停止工作,使用 KafkaListenerEndpointRegistry

实现:

  1. 禁止监听器自启动;
  2. 延时开启,发送测试;
  3. 延时暂停,发送测试;
  4. 延时停止,发送测试;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Slf4j
@Component
public class KafkaConsumer {
/**
* 监听器容器工厂(设置禁止KafkaListener自启动)
*/
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
/*禁止 KafkaListener 自启动*/
container.setAutoStartup(false);
return container;
}

/**
* 消息监听的起停控制
*/
@KafkaListener(id = "consumer7", groupId = "consumer-group",
topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_7},
containerFactory = "delayContainerFactory")
public void onMessage7(String record) {
log.debug("消息监听的起停控制-启动后可以接收到的消息: value:{} ", record);
}
}

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = start.DemoApplication.class)
public class KafkaTest {
/**
* 消息监听的起停控制 7
*/
@Test
public void sendMessage7() {
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_7, "监听 7 默认未未启动,这是一个简单的消息_7!");
/*让启动后收到消息更明显*/
try { Thread.sleep(1000); } catch (InterruptedException e) { }
if (!registry.getListenerContainer("consumer7").isRunning()) {
log.debug("监听 7 还未启动,准备启动监听7");
registry.getListenerContainer("consumer7").start();
}
log.debug("监听 7 启动");
try { Thread.sleep(1000); } catch (InterruptedException e) { }
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_7, "监听 7 已启动——————————!");
try { Thread.sleep(1000); } catch (InterruptedException e) { }
registry.getListenerContainer("consumer7").pause();
log.debug("监听 7 暂停");
try { Thread.sleep(5000); } catch (InterruptedException e) { }
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_7, "监听 7 已暂停——————————————!");
try { Thread.sleep(1000); } catch (InterruptedException e) { }
registry.getListenerContainer("consumer7").stop();
log.debug("监听 7 停止");
try { Thread.sleep(1000); } catch (InterruptedException e) { }
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_7, "监听 7 已停止——————————————!");
log.debug("发送完成");
try { Thread.sleep(5000); } catch (InterruptedException e) { }
}
}

运行结果,

  1. 暂停监听后,消息不会被接收(如果 pause 时,延迟1秒,可能消息会被接收到,与设想的不一样,可以调用 isConsumerPaused查看消息是否被暂停)
  2. 监听停止后,就不在接收消息。
  3. 虽然监听停止了,但因为 kafka 服务还在,所以消息是发送成功的。
  4. 监听停止,经过简单测试发现是立即执行的,监听暂停不是立即执行的。
  5. pause()方法在下一次poll()之前生效,而resume()方法在当前的poll()之后生效。当一个容器被暂停,它会继续拉取消费者,避免再均衡(如果组管理有使用),但是不会索取任何记录。

pause后延迟5秒的结果:

1
2
3
4
5
6
7
8
9
10
11
2021-02-01 11:44:22 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 默认未未启动,这是一个简单的消息_7! topic:test-topic_7 partition:0 offset:11
2021-02-01 11:44:23 DEBUG (KafkaTest.java:127)- 监听 7 还未启动,准备启动监听7
2021-02-01 11:44:23 DEBUG (KafkaTest.java:130)- 监听 7 启动
2021-02-01 11:44:24 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 已启动——————————! topic:test-topic_7 partition:0 offset:12
2021-02-01 11:44:24 DEBUG (KafkaConsumer.java:151)- 消息监听的起停控制-启动后可以接收到的消息: value:监听 7 已启动——————————!
2021-02-01 11:44:25 DEBUG (KafkaTest.java:135)- 监听 7 暂停
2021-02-01 11:44:30 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 已暂停——————————————! topic:test-topic_7 partition:0 offset:13
2021-02-01 11:44:31 DEBUG (KafkaTest.java:140)- 监听 7 停止
2021-02-01 11:44:32 DEBUG (KafkaTest.java:143)- 发送完成
2021-02-01 11:44:32 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 已停止——————————————! topic:test-topic_7 partition:0 offset:14

pause后延迟1秒的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
2021-02-01 11:39:59 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 默认未未启动,这是一个简单的消息_7! topic:test-topic_7 partition:0 offset:7
2021-02-01 11:40:00 DEBUG (KafkaTest.java:127)- 监听 7 还未启动,准备启动监听7
2021-02-01 11:40:00 DEBUG (KafkaTest.java:130)- 监听 7 启动
2021-02-01 11:40:01 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 已启动——————————! topic:test-topic_7 partition:0 offset:8
2021-02-01 11:40:02 DEBUG (KafkaConsumer.java:151)- 消息监听的起停控制-启动后可以接收到的消息: value:监听 7 已启动——————————!
2021-02-01 11:40:02 DEBUG (KafkaTest.java:135)- 监听 7 暂停
2021-02-01 11:40:03 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 已暂停——————————————! topic:test-topic_7 partition:0 offset:9
2021-02-01 11:40:03 DEBUG (KafkaConsumer.java:151)- 消息监听的起停控制-启动后可以接收到的消息: value:监听 7 已暂停——————————————!
2021-02-01 11:40:05 DEBUG (KafkaTest.java:140)- 监听 7 停止
2021-02-01 11:40:06 DEBUG (KafkaTest.java:144)- 发送完成
2021-02-01 11:40:06 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:监听 7 已停止——————————————! topic:test-topic_7 partition:0 offset:10


分组测试消费

测试同一组、不同组消费同一个 topicpartition

查看 topicbin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test-topic_8

1
2
Topic: test-topic_8	PartitionCount: 1	ReplicationFactor: 1	Configs: 
Topic: test-topic_8 Partition: 0 Leader: 0 Replicas: 0 Isr: 0

当前 topic 只有一个 partition

测试前把当前 kafka 配置改为立即发送,并且设置不多发。

1
2
3
4
spring: 
kafka:
# 发送等待时间
producer.properties.linger.ms: 0

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(id = "consumer8", groupId = "consumer-group8",
topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_8})
public void onMessage8(String record) {
log.debug("分组测试消费:group8-consumer8 收到消息: value:{} ", record);
}
@KafkaListener(id = "consumer8_1", groupId = "consumer-group8",
topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_8})
public void onMessage8_1(String record) {
log.debug("分组测试消费:group8-consumer8_1 收到消息: value:{} ", record);
}
@KafkaListener(id = "consumer8_2", groupId = "consumer-group8_2",
topics = {DefaultTestKafkaInitialConfiguration.TEST_TOPIC_8})
public void onMessage8_2(String record) {
log.debug("分组测试消费:group8_2-consumer8_2 收到消息: value:{} ", record);
}
}
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = start.DemoApplication.class)
public class KafkaTest {
/**
* 分组测试消费 8
*/
@Test
public void sendMessage8() {
kafkaService.sendMessage(DefaultTestKafkaInitialConfiguration.TEST_TOPIC_8, "这是一个简单的消息_8!");
log.debug("发送完成");
}
}

测试结果

1
2
3
4
2021-02-01 15:40:43 DEBUG (KafkaTest.java:172)- 发送完成
2021-02-01 15:40:43 DEBUG (KafkaServiceImpl.java:30)- 消息发送成功:message:这是一个简单的消息_8! topic:test-topic_8 partition:0 offset:0
2021-02-01 15:40:43 DEBUG (KafkaConsumer.java:168)- 分组测试消费:group8-consumer8_1 收到消息: value:这是一个简单的消息_8!
2021-02-01 15:40:43 DEBUG (KafkaConsumer.java:176)- 分组测试消费:group8_2-consumer8_2 收到消息: value:这是一个简单的消息_8! :0 offset:0

也就是意味着group8中的两个监听只有一个监听到轮数据。

以下来自简书

简介

kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。

kafka对外使用topic的概念,生产者往topic里写消息,消费者从读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。

每新写一条消息,kafka就是在对应的文件append写,所以性能非常高。

kafka的总体数据流是这样的:

img

kafka data flow

大概用法就是,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,然后进行业务处理。
图中有两个topic,topic 0有两个partition,topic 1有一个partition,三副本备份。可以看到consumer gourp 1中的consumer 2没有分到partition处理,这是有可能出现的,下面会讲到。

关于broker、topics、partitions的一些元信息用zk来存,监控和路由啥的也都会用到zk。


生产

基本流程是这样的:

img

kafka sdk product flow.png

创建一条记录,记录中一个要指定对应的topic和value,key和partition可选。 先序列化,然后按照topic和partition,放进对应的发送队列中。kafka produce都是批量请求,会积攒一批,然后一起发送,不是调send()就进行立刻进行网络发包。
如果partition没填,那么情况会是这样的:

  1. key有填
    按照key进行哈希,相同key去一个partition。(如果扩展了partition的数量那么就不能保证了)
  2. key没填
    round-robin来选partition

这些要发往同一个partition的请求按照配置,攒一波,然后由一个单独的线程一次性发过去。

API

有high level api,替我们把很多事情都干了,offset,路由啥都替我们干了,用以来很简单。
还有simple api,offset啥的都是要我们自己记录。

partition

当存在多副本的情况下,会尽量把多个副本,分配到不同的broker上。kafka会为partition选出一个leader,之后所有该partition的请求,实际操作的都是leader,然后再同步到其他的follower。当一个broker歇菜后,所有leader在该broker上的partition都会重新选举,选出一个leader。(这里不像分布式文件存储系统那样会自动进行复制保持副本数)

然后这里就涉及两个细节:怎么分配partition,怎么选leader。

关于partition的分配,还有leader的选举,总得有个执行者。在kafka中,这个执行者就叫controller。kafka使用zk在broker中选出一个controller,用于partition分配和leader选举。

partition的分配

  1. 将所有Broker(假设共n个Broker)和待分配的Partition排序
  2. 将第i个Partition分配到第(i mod n)个Broker上 (这个就是leader)
  3. 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

leader容灾

controller会在Zookeeper的/brokers/ids节点上注册Watch,一旦有broker宕机,它就能知道。当broker宕机后,controller就会给受到影响的partition选出新leader。controller从zk的/brokers/topics/[topic]/partitions/[partition]/state中,读取对应partition的ISR(in-sync replica已同步的副本)列表,选一个出来做leader。
选出leader后,更新zk,然后发送LeaderAndISRRequest给受影响的broker,让它们改变知道这事。为什么这里不是使用zk通知,而是直接给broker发送rpc请求,我的理解可能是这样做zk有性能问题吧。

如果ISR列表是空,那么会根据配置,随便选一个replica做leader,或者干脆这个partition就是歇菜。如果ISR列表的有机器,但是也歇菜了,那么还可以等ISR的机器活过来。

多副本同步

这里的策略,服务端这边的处理是follower从leader批量拉取数据来同步。但是具体的可靠性,是由生产者来决定的。
生产者生产消息的时候,通过request.required.acks参数来设置数据的可靠性。

acks what happen
0 which means that the producer never waits for an acknowledgement from the broker.发过去就完事了,不关心broker是否处理成功,可能丢数据。
1 which means that the producer gets an acknowledgement after the leader replica has received the data. 当写Leader成功后就返回,其他的replica都是通过fetcher去同步的,所以kafka是异步写,主备切换可能丢数据。
-1 which means that the producer gets an acknowledgement after all in-sync replicas have received the data. 要等到isr里所有机器同步成功,才能返回成功,延时取决于最慢的机器。强一致,不会丢数据。

在acks=-1的时候,如果ISR少于min.insync.replicas指定的数目,那么就会返回不可用。

这里ISR列表中的机器是会变化的,根据配置replica.lag.time.max.ms,多久没同步,就会从ISR列表中剔除。以前还有根据落后多少条消息就踢出ISR,在1.0版本后就去掉了,因为这个值很难取,在高峰的时候很容易出现节点不断的进出ISR列表。

从ISA中选出leader后,follower会从把自己日志中上一个高水位后面的记录去掉,然后去和leader拿新的数据。因为新的leader选出来后,follower上面的数据,可能比新leader多,所以要截取。这里高水位的意思,对于partition和leader,就是所有ISR中都有的最新一条记录。消费者最多只能读到高水位;

从leader的角度来说高水位的更新会延迟一轮,例如写入了一条新消息,ISR中的broker都fetch到了,但是ISR中的broker只有在下一轮的fetch中才能告诉leader。

也正是由于这个高水位延迟一轮,在一些情况下,kafka会出现丢数据和主备数据不一致的情况,0.11开始,使用leader epoch来代替高水位。(https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Scenario1:HighWatermarkTruncationfollowedbyImmediateLeaderElection

*思考:*
当acks=-1时

  1. 是follwers都来fetch就返回成功,还是等follwers第二轮fetch?
  2. leader已经写入本地,但是ISR中有些机器失败,那么怎么处理呢?

消费

订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。

img

untitled_page.png

API

订阅topic时,可以用正则表达式,如果有新topic匹配上,那能自动订阅上。

offset的保存

一个消费组消费partition,需要保存offset记录消费到哪,以前保存在zk中,由于zk的写性能不好,以前的解决方法都是consumer每隔一分钟上报一次。这里zk的性能严重影响了消费的速度,而且很容易出现重复消费。
在0.10版本后,kafka把这个offset的保存,从zk总剥离,保存在一个名叫__consumeroffsets topic的topic中。写进消息的key由groupid、topic、partition组成,value是偏移量offset。topic配置的清理策略是compact。总是保留最新的key,其余删掉。一般情况下,每个key的offset都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历partition建立缓存,然后查询返回。

确定consumer group位移信息写入__consumers_offsets的哪个partition,具体计算公式:

1
2
3
__consumers_offsets partition =
Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。

*思考:*
如果正在跑的服务,修改了offsets.topic.num.partitions,那么offset的保存是不是就乱套了?

分配partition–reblance

生产过程中broker要分配partition,消费过程这里,也要分配partition给消费者。类似broker中选了一个controller出来,消费也要从broker中选一个coordinator,用于分配partition。
下面从顶向下,分别阐述一下

  • 怎么选coordinator。
  • 交互流程。
  • reblance的流程。

选coordinator

  • 看offset保存在那个partition
  • 该partition leader所在的broker就是被选定的coordinator

这里我们可以看到,consumer group的coordinator,和保存consumer group offset的partition leader是同一台机器。

交互流程

把coordinator选出来之后,就是要分配了
整个流程是这样的:

  1. consumer启动、或者coordinator宕机了,consumer会任意请求一个broker,发送ConsumerMetadataRequest请求,broker会按照上面说的方法,选出这个consumer对应coordinator的地址。
    2.consumer 发送heartbeat请求给coordinator,返回IllegalGeneration的话,就说明consumer的信息是旧的了,需要重新加入进来,进行reblance。返回成功,那么consumer就从上次分配的partition中继续执行。

reblance流程

  1. consumer给coordinator发送JoinGroupRequest请求。
  2. 这时其他consumer发heartbeat请求过来时,coordinator会告诉他们,要reblance了。
  3. 其他consumer发送JoinGroupRequest请求。
  4. 所有记录在册的consumer都发了JoinGroupRequest请求之后,coordinator就会在这里consumer中随便选一个leader。然后回JoinGroupRespone,这会告诉consumer你是follower还是leader,对于leader,还会把follower的信息带给它,让它根据这些信息去分配partition
  5. consumer向coordinator发送SyncGroupRequest,其中leader的SyncGroupRequest会包含分配的情况。
  6. coordinator回包,把分配的情况告诉consumer,包括leader。

当partition或者消费者的数量发生变化时,都得进行 reblance
列举一下会 reblance 的情况:

  • 增加 partition
  • 增加消费者
  • 消费者主动关闭
  • 消费者宕机了
  • coordinator 自己也宕机了

消息投递语义

kafka支持3种消息投递语义
At most once:最多一次,消息可能会丢失,但不会重复
At least once:最少一次,消息不会丢失,可能会重复
Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)

在业务中,常常都是使用At least once的模型,如果需要可重入的话,往往是业务自己实现。

At least once

先获取数据,再进行业务处理,业务处理成功后commit offset。
1、生产者生产消息异常,消息是否成功写入不确定,重做,可能写入重复的消息
2、消费者处理消息,业务处理成功后,更新offset失败,消费者重启的话,会重复消费

At most once

先获取数据,再commit offset,最后进行业务处理。
1、生产者生产消息异常,不管,生产下一个消息,消息就丢了
2、消费者处理消息,先更新offset,再做业务处理,做业务处理失败,消费者重启,消息就丢了

Exactly once

思路是这样的,首先要保证消息不丢,再去保证不重复。所以盯着At least once的原因来搞。 首先想出来的:

  1. 生产者重做导致重复写入消息—-生产保证幂等性
  2. 消费者重复消费—消灭重复消费,或者业务接口保证幂等性重复消费也没问题

由于业务接口是否幂等,不是kafka能保证的,所以kafka这里提供的exactly once是有限制的,消费者的下游也必须是kafka。所以一下讨论的,没特殊说明,消费者的下游系统都是kafka(注:使用kafka conector,它对部分系统做了适配,实现了exactly once)。

生产者幂等性好做,没啥问题。

解决重复消费有两个方法:

  1. 下游系统保证幂等性,重复消费也不会导致多条记录。
  2. 把commit offset和业务处理绑定成一个事务。

本来exactly once实现第1点就ok了。

但是在一些使用场景下,我们的数据源可能是多个topic,处理后输出到多个topic,这时我们会希望输出时要么全部成功,要么全部失败。这就需要实现事务性。既然要做事务,那么干脆把重复消费的问题从根源上解决,把commit offset和输出到其他topic绑定成一个事务。

生产幂等性

思路是这样的,为每个producer分配一个pid,作为该producer的唯一标识。producer会为每一个<topic,partition>维护一个单调递增的seq。类似的,broker也会为每个<pid,topic,partition>记录下最新的seq。当req_seq == broker_seq+1时,broker才会接受该消息。因为:

  1. 消息的seq比broker的seq大超过时,说明中间有数据还没写入,即乱序了。

  2. 消息的seq不比broker的seq小,那么说明该消息已被保存。

    img

    解决重复生产

事务性/原子性广播

场景是这样的:

  1. 先从多个源topic中获取数据。
  2. 做业务处理,写到下游的多个目的topic。
  3. 更新多个源topic的offset。

其中第2、3点作为一个事务,要么全成功,要么全失败。这里得益与offset实际上是用特殊的topic去保存,这两点都归一为写多个topic的事务性处理。

img

基本思路是这样的:
引入tid(transaction id),和pid不同,这个id是应用程序提供的,用于标识事务,和producer是谁并没关系。就是任何producer都可以使用这个tid去做事务,这样进行到一半就死掉的事务,可以由另一个producer去恢复。
同时为了记录事务的状态,类似对offset的处理,引入transaction coordinator用于记录transaction log。在集群中会有多个transaction coordinator,每个tid对应唯一一个transaction coordinator。
注:transaction log删除策略是compact,已完成的事务会标记成null,compact后不保留。

做事务时,先标记开启事务,写入数据,全部成功就在transaction log中记录为prepare commit状态,否则写入prepare abort的状态。之后再去给每个相关的partition写入一条marker(commit或者abort)消息,标记这个事务的message可以被读取或已经废弃。成功后在transaction log记录下commit/abort状态,至此事务结束。

数据流:

img

Kafka Transactions Data Flow.png

  1. 首先使用tid请求任意一个broker(代码中写的是负载最小的broker),找到对应的transaction coordinator。

  2. 请求transaction coordinator获取到对应的pid,和pid对应的epoch,这个epoch用于防止僵死进程复活导致消息错乱,当消息的epoch比当前维护的epoch小时,拒绝掉。tid和pid有一一对应的关系,这样对于同一个tid会返回相同的pid。

  3. client先请求transaction coordinator记录<topic,partition>的事务状态,初始状态是BEGIN,如果是该事务中第一个到达的<topic,partition>,同时会对事务进行计时;client输出数据到相关的partition中;client再请求transaction coordinator记录offset的<topic,partition>事务状态;client发送offset commit到对应offset partition。

  4. client发送commit请求,transaction coordinator记录prepare commit/abort,然后发送marker给相关的partition。全部成功后,记录commit/abort的状态,最后这个记录不需要等待其他replica的ack,因为prepare不丢就能保证最终的正确性了。

这里prepare的状态主要是用于事务恢复,例如给相关的partition发送控制消息,没发完就宕机了,备机起来后,producer发送请求获取pid时,会把未完成的事务接着完成。

当partition中写入commit的marker后,相关的消息就可被读取。所以kafka事务在prepare commit到commit这个时间段内,消息是逐渐可见的,而不是同一时刻可见。

详细细节可看:https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-TransactionalGuarantees

消费事务

前面都是从生产的角度看待事务。还需要从消费的角度去考虑一些问题。
消费时,partition中会存在一些消息处于未commit状态,即业务方应该看不到的消息,需要过滤这些消息不让业务看到,kafka选择在消费者进程中进行过来,而不是在broker中过滤,主要考虑的还是性能。kafka高性能的一个关键点是zero copy,如果需要在broker中过滤,那么势必需要读取消息内容到内存,就会失去zero copy的特性。


文件组织

kafka的数据,实际上是以文件的形式存储在文件系统的。topic下有partition,partition下有segment,segment是实际的一个个文件,topic和partition都是抽象概念。

在目录/${topicName}-{$partitionid}/下,存储着实际的log文件(即segment),还有对应的索引文件。

每个segment文件大小相等,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字一样,扩展名是.index。有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查,其实这里可以优化合到一起,下面只说offset index。总体的组织是这样的:

img

kafka 文件组织.png

为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个message都记录下具体位置,而是每隔一定的字节数,再建立一条索引。 索引包含两部分,分别是baseOffset,还有position。

baseOffset:意思是这条索引对应segment文件中的第几条message。这样做方便使用数值压缩算法来节省空间。例如kafka使用的是varint。

position:在segment中的绝对位置。

查找offset对应的记录时,会先用二分法,找出对应的offset在哪个segment中,然后使用索引,在定位出offset在segment中的大概位置,再遍历查找message。


常用配置项

broker配置

配置项 作用
broker.id broker的唯一标识
auto.create.topics.auto 设置成true,就是遇到没有的topic自动创建topic。
log.dirs log的目录数,目录里面放partition,当生成新的partition时,会挑目录里partition数最少的目录放。

topic配置

配置项 作用
num.partitions 新建一个topic,会有几个partition。
log.retention.ms 对应的还有minutes,hours的单位。日志保留时间,因为删除是文件维度而不是消息维度,看的是日志文件的mtime。
log.retention.bytes partion最大的容量,超过就清理老的。注意这个是partion维度,就是说如果你的topic有8个partition,配置1G,那么平均分配下,topic理论最大值8G。
log.segment.bytes 一个segment的大小。超过了就滚动。
log.segment.ms 一个segment的打开时间,超过了就滚动。
message.max.bytes message最大多大

关于日志清理,默认当前正在写的日志,是怎么也不会清理掉的。
还有0.10之前的版本,时间看的是日志文件的mtime,但这个指是不准确的,有可能文件被touch一下,mtime就变了。因此在0.10版本开始,改为使用该文件最新一条消息的时间来判断。
按大小清理这里也要注意,Kafka在定时任务中尝试比较当前日志量总大小是否超过阈值至少一个日志段的大小。如果超过但是没超过一个日志段,那么就不会删除。

本文地址: https://github.com/maxzhao-it/blog/post/50825/

kafka介绍1

kafka.apachecn介绍

一、安装

kafka 可以通过官网下载

kafka 根据Scala版本不同,又分为多个版本,我不需要使用Scala,所以就下载官方推荐版本kafka_2.13-2.7.0.tgz

解压到opt目录下

1
2
3
4
5
6
7
wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
tar -xzf kafka_2.13-3.2.1.tgz -C ~/
cd ~/
# 为了使用方便,可以创建软链接`kafka`
ln -s kafka_2.13-3.2.1 kafka
#mv kafka_2.13-2.7.0 kafka
cd kafka

kafka的安装需要依赖Zookeeper

二、默认的Zookeeper

kafka自带的Zookeeper程序使用bin/zookeeper-server-start.sh,以及bin/zookeeper-server-stop.sh来启动和停止Zookeeper

Zookeeper的配制文件是config/zookeeper.properties,可以修改其中的参数

(1)启动Zookeeper

1
2
3
4
5
# 修改默认端口
sed -i 's?clientPort=2181?clientPort=42181?' ~/kafka/config/zookeeper.properties
cat ~/kafka/config/zookeeper.properties
~/kafka/bin/zookeeper-server-start.sh -daemon ~/kafka/config/zookeeper.properties
tail -f ~/kafka/logs/zookeeper.out
  • -daemon参数,可以在后台启动Zookeeper,输出的信息在保存在执行目录的logs/zookeeper.out文件中。

问题:对于小内存的服务器,启动时有可能会出现如下错误。

os::commit_memory(0x00000000e0000000, 536870912, 0) failed; error='Not enough space' (errno=12)

可以通过修改bin/zookeeper-server-start.sh中的参数,来减少内存的使用,将-Xmx512M -Xms512M改小。

(2)关闭Zookeeper

1
~/kafka/bin/zookeeper-server-stop.sh -daemon ~/kafka/config/zookeeper.properties

三、Kafka配置

kafka的配置文件在config/server.properties文件中,主要修改参数如下,更具体的参数说明以后再整理下。

  1. broker.idkafka broker的编号,集群里每个brokerid需不同。默认从0开始。
  2. listeners是监听地址,需要提供外网服务的话,要设置本地的IP地址
  3. log.dirs是日志目录,需要设置
  4. 设置Zookeeper集群地址,我是在同一个服务器上搭建了kafkaZookeeper,所以填的本地地址
  5. num.partitions 为新建Topic的默认Partition数量,partition数量提升,一定程度上可以提升并发性,数值应该小于等于broker的数量
  6. 因为要创建kafka集群,所以kafka的所有文件都复制两份,配置文件做相应的修改,尤其是brokeridIP地址和log.dirs。分别创建软链接kafka1kafka2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 集群里每个`broker`的`id`需不同。默认从0开始。
broker.id=0
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# 监听的地址和端口,当前监听本机所有
listeners=EXTERNAL://0.0.0.0:49092
# 主机地址
advertised.listeners=PLAINTEXT://192.168.14.123:49092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# 分区
num.partitions=1
############################# Zookeeper #############################
zookeeper.connect=localhost:42181

四、启动及停止Kafka

(1)启动kafka

1
2
~/kafka/bin/kafka-server-start.sh -daemon ~/kafka/config/server.properties
tail -f ~/kafka/logs/kafkaServer.out

-daemon 参数会将任务转入后台运行,输出日志信息将写入日志文件,查看 logs/kafkaServer.out,如果结尾输同started说明启动成功。

也可以用ps -ef|grep kafka命令,看有没有kafka的进程

kafka默认的 xmx xms都是 1G, 对于小内存的服务器,启动时有可能会出现如下错误。

os::commit_memory(0x00000000e0000000, 536870912, 0) failed; error='Not enough space' (errno=12)

可以通过修改bin/kafka-server-start.sh中的参数,来减少内存的使用,将配置改为-Xmx256M -Xms64M

(2)停止kafka

1
~/kafka/bin/kafka-server-stop.sh -daemon ~/kafka/config/server.properties

五、测试

前提:kafkaZookeeper已启动完成

1、创建topic

创建一个名为 testTopic,只有一个备份(replication-facto)和一个分区(partitions)。

1
~/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.14.123:49092 --replication-factor 1 --partitions 1 --topic test

成功提示 Created topic test.

2、查看主题

1
~/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.14.123:49092

成功提示

test

3、发送消息

Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行producer(生产者),然后在控制台输入几条消息到服务器。

1
2
3
~/kafka/bin/kafka-console-producer.sh --broker-list 192.168.14.123:49092 --topic test
First message;
Second mssage;

4、接收消息

1
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.14.123:49092 --topic test --from-beginning

展示上面输入的两个消息,表示成功;

  • --from-beginning 是从开始的消息展示

5、查看特定主题的详细信息

1
~/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.14.123:49092 --describe  --topic test
1
2
Topic: test	PartitionCount: 1	ReplicationFactor: 1	Configs: 
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  • “leader”是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。
  • “replicas”是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着。
  • “isr”是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader。

6、删除主题

1
2
~/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.14.123:49092 --delete  --topic test
~/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.14.123:49092

六、设置多个broker集群

一个 broker只是集群中的一个节点,id是这个节点的名称。

1、创建多个节点的配置

1
2
cp ~/kafka/config/server.properties ~/kafka/config/server-1.properties 
cp ~/kafka/config/server.properties ~/kafka/config/server-2.properties

修改每个配置的 idid 从0开始。

简单的配置如下

server-1.properties

1
2
3
4
5
6
7
8
9
10
# 修改节点名称
broker.id=1
# 监听的地址和端口,当前监听本机所有
listeners=PLAINTEXT://0.0.0.0:49093
# 主机地址
advertised.listeners=PLAINTEXT://192.168.14.123:49093
# 修改日志目录,防止覆盖日志
log.dir=/tmp/kafka-logs-1
# 配置 Zookeeper
zookeeper.connect=192.168.14.123:4181

server-2.properties

1
2
3
4
5
6
7
8
9
10
# 修改节点名称
broker.id=2
# 监听的地址和端口,当前监听本机所有
listeners=PLAINTEXT://0.0.0.0:49094
# 主机地址
advertised.listeners=PLAINTEXT://192.168.14.123:49094
# 修改日志目录,防止覆盖日志
log.dir=/tmp/kafka-logs-2
# 配置 Zookeeper
zookeeper.connect=192.168.14.123:4181

2、启动其它配置的服务

1
2
~/kafka/bin/kafka-server-start.sh -daemon ~/kafka/config/server-1.properties
~/kafka/bin/kafka-server-start.sh -daemon ~/kafka/config/server-2.properties

3、创建新的 topic

1
2
~/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.14.123:49092 --replication-factor 3 --partitions 1 --topic my-replicated-3-topic 
~/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.14.123:49092

成功提示:`Created topic my-replicated-3-topic.

这时候已经完成了一个集群的配置。

启动失败解决

如果报 replication 不够的错误,则是因为 server1server2可能启动失败了。

可以把 -daemon参数删掉,然后执行命令查看错误 bin/kafka-server-start.sh config/server-1.properties;

可以直接查看日志 logs/server.out;

4、查看主题列表

1
~/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.14.123:49092 --describe  --topic my-replicated-3-topic 
1
2
Topic: my-replicated-3-topic	PartitionCount: 1	ReplicationFactor: 3	Configs: 
Topic: my-replicated-3-topic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2

leader: 1表示当前 leader在节点1上。

5、收发消息测试

接收消息

1
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.14.123:49092 --topic my-replicated-3-topic

发送消息

1
~/kafka/bin/kafka-console-producer.sh --broker-list 192.168.14.123:49092 --topic my-replicated-3-topic

6、测试集群 leader崩溃

停止 leader1

1
2
3
4
5
6
# 查询进程
ps -ef|grep server-1.properties
# 查询 kafka java 进程更方便
jps -mv |grep server-1.properties
# 杀掉进程
kill -9 xxx

查看主题

1
~/kafka/bin/kafka-topics.sh --describe --broker-list 192.168.14.123:49092 --topic my-replicated-3-topic

此时的 leader已经变为了 0,所以 broker 0 成为了 leader, broker1已经不在备份集合里了。

1
2
Topic: my-replicated-3-topic	PartitionCount: 1	ReplicationFactor: 3	Configs: 
Topic: my-replicated-3-topic Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 0,2

发送消息测试

1
~/kafka/bin/kafka-console-producer.sh --broker-list 192.168.14.123:49094 --topic my-replicated-3-topic

查询所有消息

1
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.14.123:49092 --topic my-replicated-3-topic --from-beginning

这个时候会发现所有的消息都没有丢

删除主题

1
2
~/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.14.123:49092 --delete  --topic my-replicated-3-topic
~/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.14.123:49092

七、安全

官方说明

Zookeeper配置

当前下载的kafka程序里自带Zookeeper,可以直接使用其自带的Zookeeper建立集群,也可以单独使用Zookeeper安装文件建立集群。

1. 单独使用Zookeeper安装文件建立集群

Zookeeper的安装及配置可以参考另一篇博客,里面有详细介绍

ZooKeeper的安装和配置过程

https://www.cnblogs.com/zhaoshizi/p/12105143.html

Kafka与Zookeeper的关系

一个典型的Kafka集群中包含若干Produce,若干broker(一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
img
1)Producer端直接连接broker.list列表,从列表中返回TopicMetadataResponse,该Metadata包含Topic下每个partition leader建立socket连接并发送消息.

2)Broker端使用zookeeper用来注册broker信息,以及监控partition leader存活性.

3)Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息。

Zookeeper作用:管理broker、consumer

创建Broker后,向zookeeper注册新的broker信息,实现在服务器正常运行下的水平拓展。具体的,通过注册watcher,获取partition的信息。

Topic的注册,zookeeper会维护topic与broker的关系,通/brokers/topics/topic.name节点来记录。

Producer向zookeeper中注册watcher,了解topic的partition的消息,以动态了解运行情况,实现负载均衡。Zookeepr不管理producer,只是能够提供当前broker的相关信息。

Consumer可以使用group形式消费kafka中的数据。所有的group将以轮询的方式消费broker中的数据,具体的按照启动的顺序。Zookeeper会给每个consumer group一个ID,即同一份数据可以被不同的用户ID多次消费。因此这就是单播与多播的实现。以单个消费者还是以组别的方式去消费数据,由用户自己去定义。Zookeeper管理consumer的offset跟踪当前消费的offset。

kafka使用ZooKeeper用于管理、协调代理。每个Kafka代理通过Zookeeper协调其他Kafka代理。
当Kafka系统中新增了代理或某个代理失效时,Zookeeper服务将通知生产者和消费者。生产者与消费者据此开始与其他代理协调工作。

Zookeeper在Kakfa中扮演的角色:Kafka将元数据信息保存在Zookeeper中,但是发送给Topic本身的数据是不会发到Zk上的

· kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。
· 而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除 broker时,各broker间仍能自动实现负载均衡。这里的客户端指的是Kafka的消息生产端(Producer)和消息消费端(Consumer)
· Broker端使用zookeeper来注册broker信息,以及监测partitionleader存活性.
· Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partitionleader建立socket连接,并获取消息.
· Zookeer和Producer没有建立关系,只和Brokers、Consumers建立关系以实现负载均衡,即同一个ConsumerGroup中的Consumers可以实现负载均衡(因为Producer是瞬态的,可以发送后关闭,无需直接等待)

本文地址: https://github.com/maxzhao-it/blog/post/4175/

一、Kafka Connect 导入/导出 数据

从控制台写入和写回数据是一个方便的开始,但你可能想要从其他来源导入或导出数据到其他系统。对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。

Kafka Connect是导入和导出数据的一个工具。是一个在 kafka与其他系统之间的可扩展和可靠的流数据工具。它使得快速定义连接器变得非常简单,这些连接器可以将大量数据移入和移出KafkaKafka Connect
可以摄取整个数据库或者从你所有的应用服务器上收集指标到Kafka主题中,使数据可以用于低延迟的流处理。导出作业可以将数据从Kafka主题交付到二级存储和查询系统,或者交付到批处理系统以进行离线分析。

Kafka Connect功能包括:

  • Kafka Connect是Kafka连接器的通用框架——Kafka Connect标准化了其他数据系统与Kafka的集成,简化了连接器的开发、部署和管理
  • 分布式和独立模式——向上扩展到支持整个组织的大型集中管理服务,或者向下扩展到开发、测试和小型生产部署
  • 通过一个易于使用的REST API向Kafka Connect集群提交和管理连接器
  • 自动偏移量管理——Kafka Connect可以自动管理偏移量提交过程,所以连接器开发人员不需要担心连接器开发中这个容易出错的部分
  • Kafka Connect建立在现有的组管理协议上。可以添加更多的worker来扩展Kafka Connect集群。
  • 流/批处理集成-利用Kafka现有的功能,Kafka Connect是一个理想的解决方案桥接流和批处理数据系统

一些概念

  • kafka connector:是kafka connect的关键组成部分,它是一个逻辑上的job,用于在kafka和其他系统之间拷贝数据,比如:从上游系统拷贝数据到kafka,或者从kafka拷贝数据到下游系统

  • Tasks:每个kafka connector可以初始化一组task进行数据的拷贝

  • Workers:逻辑上包含kafka connector和tasks用来调度执行具体任务的进程,具体执行时分为standalone模式和distributed模式

下面介绍配置、运行和管理Kafka Connect。

二、运行 Kafka Connect

Kafka Connect目前支持两种执行模式:单机(单进程)和分布式。

在独立模式下,所有工作都在单个进程中执行。这种配置更容易设置和开始,可能在只有一个worker有意义的情况下有用(例如收集日志文件),但它没有从Kafka Connect的一些特性中受益,比如容错。可以使用如下命令启动独立进程:

1
bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

单例配置

配置文件在 config/connect-standalone.properties

首先是Kafka Connect处理的配置,包含常见的配置,例如要连接的Kafka broker和数据的序列化格式。其余的配置文件都指定了要创建的连接器。包括连接器唯一名称,和要实例化的连接器类。以及连接器所需的任何其他配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 连接器的唯一名称。再次尝试注册相同名称将会失败。
nname
# kafka 服务器地址
bootstrap.servers=localhost:9092
# 唯一的字符串,用于标识此worker所属的Connect集群组。
# group.id=
# 用于在Kafka连接格式和写入Kafka的序列化格式之间转换的转换器类。
# 控制了从Kafka写入或读取的消息中的键的格式,并且由于它独立于连接器,它允许任何连接器使用任何序列化格式。常见的格式包括JSON和Avro。
key.converter=org.apache.kafka.connect.json.JsonConverter
# 控制了写入或从Kafka读取的消息中的值的格式,并且由于它独立于连接器,它允许任何连接器使用任何序列化格式。常见的格式包括JSON和Avro。
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 用逗号分隔的主题列表,用作此连接器的输入的话题
# topics
# 主题的Java正则表达式,用作此连接器的输入对于任何其他选项,您应该查阅连接器的文档。
# topics.regex=
# kafka topic仓库配置,需要手动创建主题,以确保正确配置 default connect-configs
# config.storage.topic
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

控制台操作

在这个快速入门里,我们将看到如何运行Kafka Connect用简单的连接器从文件导入数据到Kafka主题,再从Kafka主题导出数据到文件。

首先,我们首先创建一些种子数据用来测试:

1
echo -e "foo\nbar" > test.txt

接下来,我们开始2个连接器运行在独立的模式,这意味着它们运行在一个单一的,本地的,专用的进程。我们提供3个配置文件作为参数。

1
2
3
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
# 这是分布式的命令,暂时不考虑
bin/connect-distributed.sh config/connect-distributed.properties

这些包含在Kafka中的示例配置文件使用之前启动的默认本地群集配置,并创建两个连接器: 第一个是源连接器,用于从输入文件读取行,并将其输入到 Kafka topic。 第二个是接收器连接器,它从Kafka
topic中读取消息,并在输出文件中生成一行。

在启动过程中,你会看到一些日志消息,包括一些连接器正在实例化的指示。 一旦Kafka Connect进程启动,源连接器就开始从test.txt读取行并且 将它们生产到主题connect-test
中,同时接收器连接器也开始从主题connect-test中读取消息, 并将它们写入文件test.sink.txt中。我们可以通过检查输出文件的内容来验证数据是否已通过整个pipeline进行交付:

1
more test.sink.txt

出现结果

1
2
foo
bar

请注意,数据存储在Kafka topicconnect-test中,因此我们也可以运行一个console consumer(控制台消费者)来查看 topic 中的数据(或使用custom
consumer(自定义消费者)代码进行处理):

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

结果:

1
2
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

连接器一直在处理数据,所以我们可以将数据添加到文件中,并看到它在pipeline 中移动:

1
2
# 追加一行数据
echo Another line >> test.txt

应该可以看到这一行出现在控制台用户输出和接收器文件中。

三、MySql 操作

https://www.pianshen.com/article/3877182847/

四、详细配置

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
config.storage.topic kafka topic仓库配置 string high
group.id 唯一的字符串,用于标识此worker所属的Connect集群组。 string high
key.converter 用于Kafka Connect和写入到Kafka的序列化消息的之间格式转换的转换器类。 这可以控制写入或从kafka读取的消息中的键的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 class high
offset.storage.topic 连接器的offset存储到哪个topic中 string high
status.storage.topic 追踪连接器和任务状态存储到哪个topic中 string high
value.converter 用于Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。 控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 class high
internal.key.converter 用于在Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。 这可以控制写入或从Kafka读取的消息中的key的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置用于控制框架内部使用的记账数据的格式,例如配置和偏移量,因此用户可以使用运行各种Converter实现。 class low
internal.value.converter 用于在Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。 这控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置用于控制框架内部使用的记账数据的格式,例如配置和偏移量,因此用户可以使用运行各种Converter实现。 class low
bootstrap.servers 用于建立与Kafka集群的初始连接的主机/端口列表。此列表用来发现完整服务器集的初始主机。 该列表的格式应为host1:port1,host2:port2,….由于这些服务器仅用于初始连接以发现完整的集群成员资格(可能会动态更改),因此,不需要包含完整的服务器(尽管如此,你需要多配置几个,以防止配置的宕机)。 list localhost:9092 high
heartbeat.interval.ms 心跳间隔时间。心跳用于确保会话保持活动,并在新成员加入或离开组时进行重新平衡。 该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3。 int 3000 high
rebalance.timeout.ms 限制所有组中消费者的任务处理数据和提交offset所需的时间。如果超时,那么woker将从组中删除,这也将导致offset提交失败。 int 60000 high
session.timeout.ms 用于察觉worker故障的超时时间。worker定时发送心跳以表明自己是活着的。如果broker在会话超时时间到期之前没有接收到心跳,那么broker将从分组中移除该worker,并启动重新平衡。注意,该值必须在group.min.session.timeout.msgroup.max.session.timeout.ms范围内。 int 10000 high
ssl.key.password 密钥存储文件中私钥的密码。 这对于客户端是可选的。 password null high
ssl.keystore.location 密钥存储文件的位置。 这对于客户端是可选的,可以用于客户端的双向身份验证。 string null high
ssl.keystore.password 密钥存储文件的存储密码。 客户端是可选的,只有配置了ssl.keystore.location才需要。 password null high
ssl.truststore.location 信任存储文件的位置。 string null high
ssl.truststore.password 信任存储文件的密码。 password null high
connections.max.idle.ms 多少毫秒之后关闭空闲的连接。 long 540000 medium
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 int 32768 [0,…] medium
request.timeout.ms 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 int 40000 [0,…] medium
sasl.jaas.config 用于JAAS配置文件的SASL连接的JAAS登录上下文参数格式。这里描述了JAAS配置文件的格式。该值的格式为:’ (=)*;’ password null medium
sasl.kerberos.service.name Kafka运行的Kerberos principal名称。 可以在Kafka的JAAS配置或Kafka的配置中定义。 string null medium
sasl.mechanism 用户客户端连接的SASL机制。可以提供者任何安全机制。 GSSAPI是默认机制。 string GSSAPI medium
security.protocol 用于和broker通讯的策略。有效的值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。 string PLAINTEXT medium
send.buffer.bytes 发送数据时使用TCP发送缓冲区(SO_SNDBUF)的大小。如果值为-1,则将使用OS默认。 int 131072 [-1,…] medium
ssl.enabled.protocols 启用SSL连接的协议列表。 list TLSv1.2,TLSv1 .1,TLSv1 medium
ssl.keystore.type 密钥存储文件的文件格式。 对于客户端是可选的。 string JKS medium
ssl.protocol 用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中的允许值为TLS,TLSv1.1和TLSv1.2。 旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 string TLS medium
ssl.provider 用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。 string null medium
ssl.truststore.type 信任存储文件的文件格式。 string JKS medium
worker.sync.timeout.ms 当worker与其他worker不同步并需要重新同步配置时,需等待一段时间才能离开组,然后才能重新加入。 int 3000 medium
worker.unsync.backoff.ms 当worker与其他worker不同步,并且无法在worker.sync.timeout.ms 期间追赶上,在重新连接之前,退出Connect集群的时间。 int 300000 medium
access.control.allow.methods 通过设置Access-Control-Allow-Methods标头来设置跨源请求支持的方法。 Access-Control-Allow-Methods标头的默认值允许GET,POST和HEAD的跨源请求。 string “” low
access.control.allow.origin 将Access-Control-Allow-Origin标头设置为REST API请求。要启用跨源访问,请将其设置为应该允许访问API的应用程序的域,或者 *” 以允许从任何的。 默认值只允许从REST API的域访问。 string “” low
client.id 在发出请求时传递给服务器的id字符串。这样做的目的是通过允许逻辑应用程序名称包含在请求消息中,来跟踪请求来源。而不仅仅是ip/port string “” low
config.storage.replication.factor 当创建配置仓库topic时的副本数 short 3 [1,…] low
metadata.max.age.ms 在没有任何分区leader改变,主动地发现新的broker或分区的时间。 long 300000 [0,…] low
metric.reporters A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. list “” low
metrics.num.samples 保留计算metrics的样本数(译者不清楚是做什么的) int 2 [1,…] low
metrics.sample.window.ms The window of time a metrics sample is computed over. long 30000 [0,…] low
offset.flush.interval.ms 尝试提交任务偏移量的间隔。 long 60000 low
offset.flush.timeout.ms 在取消进程并恢复要在之后尝试提交的offset数据之前,等待消息刷新并分配要提交到offset仓库的offset数据的最大毫秒数。 long 5000 low
offset.storage.partitions 创建offset仓库topic的分区数 int 25 [1,…] low
offset.storage.replication.factor 创建offset仓库topic的副本数 short 3 [1,…] low
plugin.path 包含插件(连接器,转换器,转换)逗号(,)分隔的路径列表。该列表应包含顶级目录,其中包括以下任何组合:a)包含jars与插件及其依赖关系的目录 b)具有插件及其依赖项的uber-jars c)包含插件类的包目录结构的目录及其依赖关系,注意配置:将遵循符号链接来发现依赖关系或插件。 示例:plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors list null low
reconnect.backoff.max.ms 无法连接broker时等待的最大时间(毫秒)。如果设置,则每个host的将会持续的增加,直到达到最大值。计算增加后,再增加20%的随机抖动,以避免高频的反复连接。 long 1000 [0,…] low
reconnect.backoff.ms 尝试重新连接到主机之前等待的时间。 避免了高频率反复的连接主机。 这种机制适用于消费者向broker发送的所有请求。 long 50 [0,…] low
rest.advertised.host.name 如果设置,其他wokers将通过这个hostname进行连接。 string null low
rest.advertised.port 如果设置,其他的worker将通过这个端口进行连接。 int null low
rest.host.name REST API的主机名。 如果设置,它将只绑定到这个接口。 string null low
rest.port 用于监听REST API的端口 int 8083 low
retry.backoff.ms 失败请求重新尝试之前的等待时间,避免了在某些故障的情况下,频繁的重复发送请求。 long 100 [0,…] low
sasl.kerberos.kinit.cmd Kerberos kinit命令路径. string /usr/bin/kinit low
sasl.kerberos.min.time.before.relogin 尝试refresh之间登录线程的休眠时间. long 60000 low
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动百分比。 double 0.05 low
sasl.kerberos.ticket.renew.window.factor 登录线程将休眠,直到从上次刷新ticket到期,此时将尝试续订ticket。 double 0.8 low
ssl.cipher.suites 密码套件列表。用于TLS或SSL网络协议协商网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。 list null low
ssl.endpoint.identification.algorithm 末端识别算法使用服务器证书验证服务器主机名。 string null low
ssl.keymanager.algorithm 用于SSL连接的key管理工厂的算法,默认值是Java虚拟机配置的密钥管理工厂算法。 string SunX509 low
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null low
ssl.trustmanager.algorithm 用于SSL连接的信任管理仓库算法。默认值是Java虚拟机配置的信任管理器工厂算法。 string PKIX low
status.storage.partitions 用于创建状态仓库topic的分区数 int 5 [1,…] low
status.storage.replication.factor 用于创建状态仓库topic的副本数 short 3 [1,…] low
task.shutdown.graceful.timeout.ms 等待任务正常关闭的时间,这是总时间,不是每个任务,所有任务触发关闭,然后依次等待。 long 5000 low

本文地址: https://github.com/maxzhao-it/blog/post/34489/

某天,不小心误删了 $HOME 路径下的数据。

网上找到了ext3grep尝试恢复;

安装

1
2
sudo pacman -S ext3grep

查看版本

1
ext3grep -v

输出下面信息

1
2
3
4
5
Running ext3grep version 0.10.2
ext3grep v0.10.2, Copyright (C) 2008 Carlo Wood.
ext3grep comes with ABSOLUTELY NO WARRANTY;
This program is free software; your freedom to use, change
and distribute this program is protected by the GPL.

查看命令

1
ext3grep --help 

查询要恢复文件的 inode 号

1
ext3grep /dev/sda5 --ls --inode 10092546

查询最大 inode

1
ls -id

失败

本文地址: https://github.com/maxzhao-it/blog/post/23026/

下载主题到 gnome-look

我这里下载的排名最高的 Tela grub

解压缩主题包

1
sudo tar -xf 主题包名 

安装主题

1、复制主题

1
2
3
sudo cp -r 主题包名 /boot/grub/themes/  
# 或者
sudo cp -r Tela /usr/share/grub/themes/

2、直接执行install.sh脚本

1
sudo sh Tela-2k/install.sh

修改配置文件

1
sudo vim /etc/grub.d/00_header

添加如下内容:

1
2
GRUB_THEME="/boot/grub/themes/主题包名/theme.txt"
GRUB_GFXMODE="1920x1080x32"

更新配置文件

1
sudo grub-mkconfig -o /boot/grub/grub.cfg

修改启动界面字体

1
2
# 这里是主题目录
vim /usr/share/grub/themes/Tela/theme.txt

修改其中的

1
item_font=""

改好保存就可以了。

本文地址: https://github.com/maxzhao-it/blog/post/48634/