KafkaConnect导入导出数据
一、Kafka Connect 导入/导出 数据
从控制台写入和写回数据是一个方便的开始,但你可能想要从其他来源导入或导出数据到其他系统。对于大多数系统,可以使用kafka Connect
,而不需要编写自定义集成代码。
Kafka Connect
是导入和导出数据的一个工具。是一个在 kafka
与其他系统之间的可扩展和可靠的流数据工具。它使得快速定义连接器变得非常简单,这些连接器可以将大量数据移入和移出Kafka
。Kafka Connect
可以摄取整个数据库或者从你所有的应用服务器上收集指标到Kafka主题中,使数据可以用于低延迟的流处理。导出作业可以将数据从Kafka
主题交付到二级存储和查询系统,或者交付到批处理系统以进行离线分析。
Kafka Connect功能包括:
- Kafka Connect是Kafka连接器的通用框架——Kafka Connect标准化了其他数据系统与Kafka的集成,简化了连接器的开发、部署和管理
- 分布式和独立模式——向上扩展到支持整个组织的大型集中管理服务,或者向下扩展到开发、测试和小型生产部署
- 通过一个易于使用的REST API向Kafka Connect集群提交和管理连接器
- 自动偏移量管理——Kafka Connect可以自动管理偏移量提交过程,所以连接器开发人员不需要担心连接器开发中这个容易出错的部分
- Kafka Connect建立在现有的组管理协议上。可以添加更多的worker来扩展Kafka Connect集群。
- 流/批处理集成-利用Kafka现有的功能,Kafka Connect是一个理想的解决方案桥接流和批处理数据系统
一些概念
kafka connector
:是kafka connect
的关键组成部分,它是一个逻辑上的job,用于在kafka和其他系统之间拷贝数据,比如:从上游系统拷贝数据到kafka,或者从kafka拷贝数据到下游系统Tasks:每个
kafka connector
可以初始化一组task进行数据的拷贝Workers:逻辑上包含
kafka connector
和tasks用来调度执行具体任务的进程,具体执行时分为standalone模式和distributed模式
下面介绍配置、运行和管理Kafka Connect。
二、运行 Kafka Connect
Kafka Connect目前支持两种执行模式:单机(单进程)和分布式。
在独立模式下,所有工作都在单个进程中执行。这种配置更容易设置和开始,可能在只有一个worker有意义的情况下有用(例如收集日志文件),但它没有从Kafka Connect的一些特性中受益,比如容错。可以使用如下命令启动独立进程:
1 | bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...] |
单例配置
配置文件在 config/connect-standalone.properties
首先是Kafka Connect处理的配置,包含常见的配置,例如要连接的Kafka broker和数据的序列化格式。其余的配置文件都指定了要创建的连接器。包括连接器唯一名称,和要实例化的连接器类。以及连接器所需的任何其他配置。
1 | # 连接器的唯一名称。再次尝试注册相同名称将会失败。 |
控制台操作
在这个快速入门里,我们将看到如何运行Kafka Connect
用简单的连接器从文件导入数据到Kafka主题,再从Kafka
主题导出数据到文件。
首先,我们首先创建一些种子数据用来测试:
1 | echo -e "foo\nbar" > test.txt |
接下来,我们开始2个连接器运行在独立的模式,这意味着它们运行在一个单一的,本地的,专用的进程。我们提供3个配置文件作为参数。
1 | bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties |
这些包含在Kafka中的示例配置文件使用之前启动的默认本地群集配置,并创建两个连接器: 第一个是源连接器,用于从输入文件读取行,并将其输入到 Kafka topic。 第二个是接收器连接器,它从Kafka
topic中读取消息,并在输出文件中生成一行。
在启动过程中,你会看到一些日志消息,包括一些连接器正在实例化的指示。 一旦Kafka Connect进程启动,源连接器就开始从test.txt
读取行并且 将它们生产到主题connect-test
中,同时接收器连接器也开始从主题connect-test
中读取消息, 并将它们写入文件test.sink.txt
中。我们可以通过检查输出文件的内容来验证数据是否已通过整个pipeline进行交付:
1 | more test.sink.txt |
出现结果
1 | foo |
请注意,数据存储在Kafka topicconnect-test
中,因此我们也可以运行一个console consumer(控制台消费者)来查看 topic 中的数据(或使用custom
consumer(自定义消费者)代码进行处理):
1 | bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning |
结果:
1 | {"schema":{"type":"string","optional":false},"payload":"foo"} |
连接器一直在处理数据,所以我们可以将数据添加到文件中,并看到它在pipeline 中移动:
1 | # 追加一行数据 |
应该可以看到这一行出现在控制台用户输出和接收器文件中。
三、MySql
操作
https://www.pianshen.com/article/3877182847/
四、详细配置
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
config.storage.topic | kafka topic仓库配置 | string | high | ||
group.id | 唯一的字符串,用于标识此worker所属的Connect集群组。 | string | high | ||
key.converter | 用于Kafka Connect和写入到Kafka的序列化消息的之间格式转换的转换器类。 这可以控制写入或从kafka读取的消息中的键的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 | class | high | ||
offset.storage.topic | 连接器的offset存储到哪个topic中 | string | high | ||
status.storage.topic | 追踪连接器和任务状态存储到哪个topic中 | string | high | ||
value.converter | 用于Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。 控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 | class | high | ||
internal.key.converter | 用于在Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。 这可以控制写入或从Kafka读取的消息中的key的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置用于控制框架内部使用的记账数据的格式,例如配置和偏移量,因此用户可以使用运行各种Converter实现。 | class | low | ||
internal.value.converter | 用于在Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。 这控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置用于控制框架内部使用的记账数据的格式,例如配置和偏移量,因此用户可以使用运行各种Converter实现。 | class | low | ||
bootstrap.servers | 用于建立与Kafka集群的初始连接的主机/端口列表。此列表用来发现完整服务器集的初始主机。 该列表的格式应为host1:port1,host2:port2,….由于这些服务器仅用于初始连接以发现完整的集群成员资格(可能会动态更改),因此,不需要包含完整的服务器(尽管如此,你需要多配置几个,以防止配置的宕机)。 | list | localhost:9092 | high | |
heartbeat.interval.ms | 心跳间隔时间。心跳用于确保会话保持活动,并在新成员加入或离开组时进行重新平衡。 该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3。 | int | 3000 | high | |
rebalance.timeout.ms | 限制所有组中消费者的任务处理数据和提交offset所需的时间。如果超时,那么woker将从组中删除,这也将导致offset提交失败。 | int | 60000 | high | |
session.timeout.ms | 用于察觉worker故障的超时时间。worker定时发送心跳以表明自己是活着的。如果broker在会话超时时间到期之前没有接收到心跳,那么broker将从分组中移除该worker,并启动重新平衡。注意,该值必须在group.min.session.timeout.ms 和group.max.session.timeout.ms 范围内。 |
int | 10000 | high | |
ssl.key.password | 密钥存储文件中私钥的密码。 这对于客户端是可选的。 | password | null | high | |
ssl.keystore.location | 密钥存储文件的位置。 这对于客户端是可选的,可以用于客户端的双向身份验证。 | string | null | high | |
ssl.keystore.password | 密钥存储文件的存储密码。 客户端是可选的,只有配置了ssl.keystore.location才需要。 | password | null | high | |
ssl.truststore.location | 信任存储文件的位置。 | string | null | high | |
ssl.truststore.password | 信任存储文件的密码。 | password | null | high | |
connections.max.idle.ms | 多少毫秒之后关闭空闲的连接。 | long | 540000 | medium | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 32768 | [0,…] | medium |
request.timeout.ms | 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 | int | 40000 | [0,…] | medium |
sasl.jaas.config | 用于JAAS配置文件的SASL连接的JAAS登录上下文参数格式。这里描述了JAAS配置文件的格式。该值的格式为:’ (=)*;’ | password | null | medium | |
sasl.kerberos.service.name | Kafka运行的Kerberos principal名称。 可以在Kafka的JAAS配置或Kafka的配置中定义。 | string | null | medium | |
sasl.mechanism | 用户客户端连接的SASL机制。可以提供者任何安全机制。 GSSAPI是默认机制。 | string | GSSAPI | medium | |
security.protocol | 用于和broker通讯的策略。有效的值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。 | string | PLAINTEXT | medium | |
send.buffer.bytes | 发送数据时使用TCP发送缓冲区(SO_SNDBUF)的大小。如果值为-1,则将使用OS默认。 | int | 131072 | [-1,…] | medium |
ssl.enabled.protocols | 启用SSL连接的协议列表。 | list | TLSv1.2,TLSv1 .1,TLSv1 | medium | |
ssl.keystore.type | 密钥存储文件的文件格式。 对于客户端是可选的。 | string | JKS | medium | |
ssl.protocol | 用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中的允许值为TLS,TLSv1.1和TLSv1.2。 旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 | string | TLS | medium | |
ssl.provider | 用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。 | string | null | medium | |
ssl.truststore.type | 信任存储文件的文件格式。 | string | JKS | medium | |
worker.sync.timeout.ms | 当worker与其他worker不同步并需要重新同步配置时,需等待一段时间才能离开组,然后才能重新加入。 | int | 3000 | medium | |
worker.unsync.backoff.ms | 当worker与其他worker不同步,并且无法在worker.sync.timeout.ms 期间追赶上,在重新连接之前,退出Connect集群的时间。 | int | 300000 | medium | |
access.control.allow.methods | 通过设置Access-Control-Allow-Methods标头来设置跨源请求支持的方法。 Access-Control-Allow-Methods标头的默认值允许GET,POST和HEAD的跨源请求。 | string | “” | low | |
access.control.allow.origin | 将Access-Control-Allow-Origin标头设置为REST API请求。要启用跨源访问,请将其设置为应该允许访问API的应用程序的域,或者 *” 以允许从任何的域 。 默认值只允许从REST API的域访问。 |
string | “” | low | |
client.id | 在发出请求时传递给服务器的id字符串。这样做的目的是通过允许逻辑应用程序名称包含在请求消息中,来跟踪请求来源。而不仅仅是ip/port | string | “” | low | |
config.storage.replication.factor | 当创建配置仓库topic时的副本数 | short | 3 | [1,…] | low |
metadata.max.age.ms | 在没有任何分区leader改变,主动地发现新的broker或分区的时间。 | long | 300000 | [0,…] | low |
metric.reporters | A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. | list | “” | low | |
metrics.num.samples | 保留计算metrics的样本数(译者不清楚是做什么的) | int | 2 | [1,…] | low |
metrics.sample.window.ms | The window of time a metrics sample is computed over. | long | 30000 | [0,…] | low |
offset.flush.interval.ms | 尝试提交任务偏移量的间隔。 | long | 60000 | low | |
offset.flush.timeout.ms | 在取消进程并恢复要在之后尝试提交的offset数据之前,等待消息刷新并分配要提交到offset仓库的offset数据的最大毫秒数。 | long | 5000 | low | |
offset.storage.partitions | 创建offset仓库topic的分区数 | int | 25 | [1,…] | low |
offset.storage.replication.factor | 创建offset仓库topic的副本数 | short | 3 | [1,…] | low |
plugin.path | 包含插件(连接器,转换器,转换)逗号(,)分隔的路径列表。该列表应包含顶级目录,其中包括以下任何组合:a)包含jars与插件及其依赖关系的目录 b)具有插件及其依赖项的uber-jars c)包含插件类的包目录结构的目录及其依赖关系,注意配置:将遵循符号链接来发现依赖关系或插件。 示例:plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors | list | null | low | |
reconnect.backoff.max.ms | 无法连接broker时等待的最大时间(毫秒)。如果设置,则每个host的将会持续的增加,直到达到最大值。计算增加后,再增加20%的随机抖动,以避免高频的反复连接。 | long | 1000 | [0,…] | low |
reconnect.backoff.ms | 尝试重新连接到主机之前等待的时间。 避免了高频率反复的连接主机。 这种机制适用于消费者向broker发送的所有请求。 | long | 50 | [0,…] | low |
rest.advertised.host.name | 如果设置,其他wokers将通过这个hostname进行连接。 | string | null | low | |
rest.advertised.port | 如果设置,其他的worker将通过这个端口进行连接。 | int | null | low | |
rest.host.name | REST API的主机名。 如果设置,它将只绑定到这个接口。 | string | null | low | |
rest.port | 用于监听REST API的端口 | int | 8083 | low | |
retry.backoff.ms | 失败请求重新尝试之前的等待时间,避免了在某些故障的情况下,频繁的重复发送请求。 | long | 100 | [0,…] | low |
sasl.kerberos.kinit.cmd | Kerberos kinit命令路径. | string | /usr/bin/kinit | low | |
sasl.kerberos.min.time.before.relogin | 尝试refresh之间登录线程的休眠时间. | long | 60000 | low | |
sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动百分比。 | double | 0.05 | low | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将休眠,直到从上次刷新ticket到期,此时将尝试续订ticket。 | double | 0.8 | low | |
ssl.cipher.suites | 密码套件列表。用于TLS或SSL网络协议协商网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。 | list | null | low | |
ssl.endpoint.identification.algorithm | 末端识别算法使用服务器证书验证服务器主机名。 | string | null | low | |
ssl.keymanager.algorithm | 用于SSL连接的key管理工厂的算法,默认值是Java虚拟机配置的密钥管理工厂算法。 | string | SunX509 | low | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | low | |
ssl.trustmanager.algorithm | 用于SSL连接的信任管理仓库算法。默认值是Java虚拟机配置的信任管理器工厂算法。 | string | PKIX | low | |
status.storage.partitions | 用于创建状态仓库topic的分区数 | int | 5 | [1,…] | low |
status.storage.replication.factor | 用于创建状态仓库topic的副本数 | short | 3 | [1,…] | low |
task.shutdown.graceful.timeout.ms | 等待任务正常关闭的时间,这是总时间,不是每个任务,所有任务触发关闭,然后依次等待。 | long | 5000 | low |