SpringBoot系列之canal和kafka实现异步实时更新
实验开发环境
JDK 1.8
SpringBoot2.2.1
Maven 3.2+
开发工具
IntelliJ IDEA
smartGit
canal是阿里开源的, 对数据库增量日志解析,提供增量数据订阅和消费的组件。引用官网的图片,canal的工作原理主要是模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave,向master发送dump 协议,获取到数据后,解析 binary log 对象数据。
本博客基于window系统的,linux系统的可以参考quickstart,比较详细。
使用canal需要确保数据库开启了binlog:
show variables like'log_%';
如果没开启,在mysql my.ini配置文件添加配置,注意文件内存为的时候,注意编码格式必须为ANSI,不然会编译报错
[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1
配置文件修改是否正确,使用命令,查看日志
mysqld --console
重启MySQL实例
net stop mysql
net start mysql
binlog开启后,创建一个canal用户并授权,官网配置是@%
,表示所有服务器,因为本地测试的,所以改为localhost
就可以
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost' identified by 'canal';
FLUSH PRIVILEGES;
下载canal服务端,到官网releases下载对应资料,canal.deployer-1.1.6-SNAPSHOT
是服务端,在conf文件夹里找到\example\instance.properties
,修改数据库配置信息,dbUsername,dbPassword数据库账号密码
到canal服务器安装目录D:\canal\canal.deployer-1.1.6-SNAPSHOT\bin
,找到startup.bat
执行
官网下载链接:https://kafka.apache.org/downloads,最开始用最新版的,发现在我的win10系统没部署成功,所以还是选择2.8.1版本的
在D:\kafka_2.12-2.8.1\bin\windows,使用cmd命令启动zookeeper,window系统修改conf文件夹下面的zookeeper.properties里面的dataDir
zookeeper-server-start.bat ..\..\config\zookeeper.properties
window系统修改conf文件夹下面的log.dirs路径
kafka-server-start.bat ..\..\config\server.properties
使用canal监听mysql数据库里的binlog,一旦修改了order订单表,也就是下单成功,就讲订单数据通过kafka做异步处理,将订单数据同步到仓库系统(kafka消费者)做业务处理,仓库商品的数据更新等业务。
创建一个工程,实现对kafka的api简单封装
jdk选择jdk8的
选择需要的依赖
基于kafka的EventPublisher
package com.example.ebus.publisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.kafka.core.KafkaTemplate;
@Slf4j
public class MyEventPublisher {
private KafkaTemplate<String, Object> kafkaTemplate;
@Value("${app.ebus.topic:ebus}")
private String topic;
public MyEventPublisher(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void publishEvent(Object event) {
if (log.isInfoEnabled()) {
log.info("topic发送:{}", event.getClass().getName());
}
kafkaTemplate.send(topic, event);
}
}
自动配置类
package com.example.ebus.configuration;
import com.example.ebus.publisher.MyEventPublisher;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
@Configuration
public class EbusAutoConfiguration {
@Bean
public MyEventPublisher myEventPublisher(@Qualifier("kafkaTemplate") KafkaTemplate<String, Object> kafkaTemplate) {
return new MyEventPublisher(kafkaTemplate);
}
}
META-INF/spring.factories,加上配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.example.ebus.configuration.EbusAutoConfiguration
实现canal进行mysql binlog的监听,然后
新建SpringBoot工程,使用阿里的脚手架,网速比较快
jdk使用1.8的
加上一些其它的配置
在pom文件加上canal客户端的配置:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
加上starter配置:
<dependency>
<groupId>com.example.ebus</groupId>
<artifactId>springboot-ebus-starter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
加上kafka配置:
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
canal进行binlog监听,订单表新增数据就发生信息
package com.example.ali.canal.client;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.ebus.event.ShopOrderEvent;
import com.example.ebus.publisher.MyEventPublisher;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
@Component
public class AliCanalClient implements ApplicationRunner {
@Autowired
private MyEventPublisher eventPublisher;
private static final int batchSize = 1;
@Override
public void run(ApplicationArguments args) throws Exception {
// 创建canal连接器
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "canal", "canal");
try {
// 连接canal服务端
connector.connect();
// 只订阅*order的表,订阅所有表:".*\\..*"
connector.subscribe(".*order.*");
// 回滚到未进行ack确认的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
// 获取批量ID
long batchId = message.getId();
// 获取批量的数量
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
//如果没有数据,线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
}
} else {
//如果有数据,处理数据
handle(message.getEntries());
}
// ack确认batchId。小于等于这个batchId的消息都会被确认
connector.ack(batchId);
}
} finally {
// 释放连接
connector.disconnect();
}
}
private void handle(List<CanalEntry.Entry> entries) {
entries.stream().forEach(entry ->{
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
// 开启/关闭事务的实体类型,跳过
return;
}
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
try {
// 获取rowChange
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
// 针对新增操作的监听
if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
// 遍历rowChange里的所有的行数据
rowChange.getRowDatasList().stream().forEach((row->{
row.getAfterColumnsList().stream().forEach(column->{
publishEvent(row);
});
}));
}
}catch (InvalidProtocolBufferException e) {
throw new RuntimeException("解释Binlog日志出现异常:" + entry, e);
}
}
});
}
private void publishEvent(CanalEntry.RowData rowData) {
ShopOrderEvent orderEvent = new ShopOrderEvent();
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
columns.forEach((column -> {
String name = column.getName();
String value = column.getValue();
Optional.ofNullable(value).ifPresent((v)->{
if ("orderCode".equals(name)) {
orderEvent.setOrderCode(v);
}
if ("productName".equals(name)) {
orderEvent.setProductName(v);
}
if ("price".equals(name)) {
orderEvent.setPrice(Float.valueOf(v));
}
if ("productDesc".equals(name)) {
orderEvent.setProductDesc(v);
}
if ("isOk".equals(name)) {
orderEvent.setIsOk(Integer.valueOf(v));
}
});
}));
eventPublisher.publishEvent(orderEvent);
}
}
同样创建一个消费者工程,配置:
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
enable-auto-commit: true
group-id: consumer1
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
max-poll-records: 1
properties:
spring:
json:
trusted:
packages: '*'
进行监听,使用kafka的KafkaListener
package com.example.consumer.handler;
import com.example.ebus.event.ShopOrderEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class OrderListenerHandler {
@KafkaListener(topics = {"${app.ebus.topic:ebus}"})
public void obtainTopicData(ShopOrderEvent event) {
log.info("下单成功,orderCode:{}" , event.getOrderCode());
// 业务处理
}
}
ok,测试的可以,新增一些表数据,就可以看到日志打印,本博客代码例子可以在GitHub找到下载链接
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://smilenicky.blog.csdn.net/article/details/122211056
内容来源于网络,如有侵权,请联系作者删除!