数据实时同步
温馨提示
在这里,通过深入学习代码实现,您将系统地探索如何实现数据实时同步
功能的具体代码细节。
我们将以最简单直接
的方式为您呈现内容!
# 🥗 为什么要做数据同步
- 在业务实现过程中,我们为何需要实施数据同步?
在业务实现中,数据同步是非常重要的,主要有以下几个原因:
数据一致性:在分布式系统或者多节点环境中,数据同步可以确保各个节点或系统之间的数据保持一致。如果不同节点或系统之间的数据不一致,可能会导致业务逻辑混乱,甚至产生错误的结果。
故障恢复:当某个节点或系统出现故障时,其他节点或系统可以通过数据同步来恢复数据,保证业务的连续性。
负载均衡:在分布式系统中,数据同步可以帮助实现负载均衡。通过将数据同步到多个节点,可以分散请求压力,提高系统的整体性能。
业务扩展性:随着业务的发展,可能需要增加新的节点或系统来支持更多的业务。数据同步可以帮助新加入的节点或系统快速获取到需要的数据,从而顺利地参与到业务中。
数据备份与恢复:数据同步也可以作为数据备份的一种方式。通过定期同步数据到备份节点或系统,可以在原节点或系统出现故障时快速恢复数据。
安全性:数据同步可以帮助实现数据的冗余存储,即使某个节点或系统被攻击或损坏,数据也不会丢失。这有助于保护数据的安全性。
综上所述,数据同步在业务实现中起到了保障数据一致性
、故障恢复
、负载均衡
、业务扩展性
、数据备份与恢复
以及安全性
等多种重要作用。
# 🍝 数据同步方案
# 常见方案
常见的数据同步方案主要有以下四种:
同步双写:在将数据写到 MySQL 时,同时将数据写到 ES。优点是业务逻辑简单、实时性高;缺点是硬编码,有需要写入 MySQL 的地方都需要添加写入 ES 的代码,业务强耦合,存在双写失败丢数据风险,性能较差。
异步双写:针对多数据源写入的场景,可以借助 MQ 实现异步的多源写入。优点是性能高,不易出现数据丢失问题,多源写入之间相互隔离,便于扩展更多的数据源写入。
基于 SQL 抽取:通过定时器程序按一定的时间周期扫描指定的表,把该时间段内发生变化的数据提取出来,逐条写入到 ES 中。优点是不改变原来代码,没有侵入性、没有硬编码;缺点是时效性较差,对数据库有一定的轮询压力。
基于 Binlog 实时同步:利用 MySQL 的 Binlog 来进行同步。具体步骤包括读取 MySQL 的 Binlog 日志,获取指定表的日志信息;将读取的信息转为 MQ;编写一个 MQ 消费程序;不断消费 MQ,每消费完一条消息,将消息写入到 ES 中。优点是没有代码侵入、没有硬编码;原有系统不需要任何变化,没有感知;性能高;业务解耦,不需要关注原来系统的业务逻辑。缺点是构建 Binlog 系统复杂;如果采用 MQ 消费解析的 Binlog 信息,也会像方案二一样存在 MQ 延时的风险。
# 全量同步和增量同步
增量同步和全量同步是数据同步的两种常见方式。
增量同步,顾名思义,只同步发生变化的数据。这种方式的优点是可以大大减少数据传输量,提高同步效率,特别是在数据量大且变化频繁的场景下非常有用。但是,它的缺点是需要记录数据的变化情况,实现起来相对复杂。
全量同步则是将全部数据都进行同步,无论数据是否发生变化。这种方式的优点是实现简单,易于理解。但是,当数据量非常大时,全量同步会消耗大量的网络和存储资源,同步效率较低。
在实际使用中,需要根据具体的业务需求和场景来选择合适的同步方式。例如,对于数据变化不频繁,且对数据一致性要求不高的业务,可以选择全量同步;而对于数据变化频繁,需要实时反映业务状态的业务,则更适合使用增量同步。
# ☕ Logstash 数据同步管道
接下来,我们将构建一个基础的示例,以实际演示如何利用 Logstash 的数据传输管道,确保 MySQL 和 Elasticsearch 之间的数据一致性。我们将通过这一过程,深入理解 Logstash 在实现数据库与搜索引擎之间的数据同步中的关键作用。
推荐阅读
Getting Started with Logstash | Logstash Reference [7.17] | Elastic (opens new window)
Running Logstash on Windows | Logstash Reference [7.17] | Elastic (opens new window)
传输 和 处理 数据的管道
- 好处:用起来方便,插件多
- 缺点:成本更大、一般要配合其他组件使用(比如 kafka)
本质上就是把编程式同步改为配置式同步,更加方便快捷
# 下载安装
Download Logstash Free | Get Started Now | Elastic (opens new window)
# demo 测试
[Stashing Your First Event | Logstash Reference 7.17] | Elastic (opens new window)
我们根据官网指引,可以找到这么一段测试代码:
logstash.bat -e "input { stdin { } } output { stdout {} }"
- 在bin 目录下执行这段代码(可以理解为:指定输入输出配置均为默认,开启 Logstash)
待启动完成后,随便输入内容,如果在命令行中有返回相同内容,则测试成功
- 如图所示:
# 自定义配置
快速开始:[Running Logstash on Windows | Logstash Reference 7.17] | Elastic (opens new window)(2023/09/22 晚)
- 在官方文档中,找到这一段简单的示例配置:
- 将这段配置粘贴进 config 下的 **logstash-sample.conf 配置文件(可以保留该原文件,复制一份重命名)**中:
- 这几行配置是干什么的呢?简单来讲就是定义了输入和输出:监听 UDP,并输出
- 按官方文档的操作来,尝试加载这个配置文件 并 启动 Logstash:
.\bin\logstash.bat -f .\config\myTask.conf
[Running Logstash on Windows | Logstash Reference 7.17] | Elastic (opens new window)
运行这行命令,可以看到 Logstash 成功启动运行了
# 同步 MySQL
[Jdbc input plugin | Logstash Reference 7.17] | Elastic (opens new window)
在官方文档中,找到这段配置,用来把 input 输入与 MySQL 数据库中的数据同步(2023/09/22 晚)
input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
jdbc_user => "mysql"
parameters => { "favorite_artist" => "Beethoven" }
schedule => "* * * * *"
statement => "SELECT * from songs where artist = :favorite_artist"
}
}
这些配置是不是很眼熟?我们简单说明一下:
- jdbc_driver_library:就是加载 MySQL 数据库的 jar 包(依赖)
- 接下来的四行配置不用多说,连接MySQL 的驱动、对应数据库、用户名、密码
- statement:SQL 表达式,用来从 MySQL 中获取数据
- parameters:起到动态配置 SQL 语句中的参数的作用
- schedule:Cron 表达式,实现定时查询
我们按自己实际的的需求,可以简单地修改配置
- 当然了,如果我们我们现在加载此配置、启动 Logstash,一定会报错,如图所示:
原因很简单,就是配置中的 mysql jar 包找不到,我们需要自己配置一个 mysql jar 包,并正确配置它的路径
这里有个技巧:在 IDEA 中找到项目所依赖的 jar 包
- 如图所示,选择对应的依赖后,可以直接在文件管理器中打开
- 然后直接在文件管理器中复制,粘贴到 Logstash 目录下即可
加载配置、启动 Logstash,启动成功了:
聊聊我在这段配置上踩过的坑吧:
- mysql jar 包路径外层多加了一层双引号
- 用户名、密码配置错误
- SQL 语句中 where 多写了一个
- timestamp 写成 timestampe
这段配置绝对不能出现任何问题,否则就会出现严重的报错。我的最终配置是这样的:
# Sample Logstash configuration for receiving
# UDP syslog messages over port 514
input {
jdbc {
jdbc_driver_library => "D:\softWare\logstash\logstash-7.17.9\config\mysql-connector-java-8.0.29.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/memory_search"
jdbc_user => "root"
jdbc_password => "Dw990831"
statement => "SELECT * from post where 1 = 1"
schedule => "*/5 * * * *"
}
}
output {
stdout { codec => rubydebug }
}
启动成功后,现在的 Logstash 是每 5 秒从数据库中同步所有数据(当然这是根据 SQL 语句来执行的),这数据量可能会很大
这就是全量同步了,我们不需要同步所有数据,我们可以选择同步最近更新的数据
添加如下配置:
statement => "SELECT * from post where updateTime > :sql_last_value"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "updateTime"
schedule => "*/5 * * * * *"
jdbc_default_timezone => "Asia/Shanghai"
这段配置就是根据 updateTime 字段的最新值,同步 updateTime 大于最新值的数据:
- 所以说,sql_last_value 指定的是上次查到的数据的最后一行的指定字段,新的查询就是比较这个指定字段与 sql_last_value 的大小
- 但是经过多次查询发现,这里的 sql_last_value 始终不变
- 我们可以在 data\plugins\inputs\jdbc\logstash_jdbc_last_run 看到 sql_last_value 指定的数据,确实没有变化:
- 将 tracking_column => "updateTime" 的 updateTime 修改为 updatetime,日期同步成功
更新下数据库中的最新值,再看看效果,确实拿到了数据库中最新修改的值(参照上次修改后的最新值):
# 同步 ES
调试这么久,Logstash 能够正常同步 MySQL 了,接下来就是把同步到 input 的数据,同步到 ES 中了**(2023/09/22 晚)**
直接在官方文档中,找到输出 output 的相关配置:
- 跟着官网简单的 demo 学就行,配置过一次就会了,这是我完成同步 ES 后的配置:(部分私密信息已做处理)
# Sample Logstash configuration for receiving
# UDP syslog messages over port 514
input {
jdbc {
jdbc_driver_library => "D:\softWare\logstash\logstash-7.17.9\config\mysql-connector-java-8.0.29.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/"******""
jdbc_user => "******"
jdbc_password => ""******""
statement => "SELECT * from post where updateTime > :sql_last_value"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "updatetime"
schedule => "*/5 * * * * *"
jdbc_default_timezone => "Asia/Shanghai"
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => "127.0.0.1:9200"
index => "post_v1"
document_id => "%{id}"
}
}
这里简单介绍下这几个配置的作用:
host:标识要进行同步的 ES 地址,即指定了:数据从 MySQL 中取出后,发送到哪
index:目标索引
document_id:指定目标索引内,每一个文档的 id,就是从 SELECT * 中解构出 id 值
data_stream:特殊的数据格式,我们从数据库中取到的都是普通类型,不需要这行配置
其他的目前暂且不需要了解,日后再进一步学习
加载配置,运行 Logstash,可以看到运行成功了,数据库中最新更新的数据也成功同步到了本地的 ES 上了:
从同步结果来看,我们还需要解决几个问题:
- 排除某些不需要同步的字段
- ES 中同步过来的文档数据字段都是全小写,不是驼峰式
- 查询结果按 updateTime 降序排列,避免重排序,导致多同步了不必要的数据,造成性能浪费
解决这三个问题当然很简单:
首先修改下 SQL 语句:
statement => "SELECT * from post where updateTime > :sql_last_value and updateTime < now() order by updateTime desc"
再写入如下过滤配置,将对应字段进行驼峰式转换,并排除不需要的字段:
filter {
mutate {
rename => {
"updatetime" => "updateTime"
"userid" => "userId"
"createtime" => "createTime"
"isdelete" => "isDelete"
}
remove_field => ["thumbnum", "favournum"]
}
}
重新进行同步,结果完美,顺利完成:
# 🍚 Canal 监控数据库流水
# 基础配置
🥣 推荐阅读:
我们简单实现通过 Canal 监控数据库流水,实时监听 MySQL 中的数据变更,并实时同步变更的数据到 Elasticsearch 中
# 本地数据库配置
- 创建新用户:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
- 在本地 MySQL 中的 my.ini 文件中做如下配置(将本地的 MySQL 作为一个主节点,开启本地 binlog 生成):
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
# Canal 的下载安装
# Canal 配置
- 在
\conf\example\instance.properties
目录下做如下配置:
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 启动 Canal
- 在
bin
目录下,输入以下命令启动:
startup.bat
# 🥣 实战操作
导入相关依赖:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
- 我们拉取官网提供的 demo 代码,直接启动:
package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
本地数据库修改数据,测试 Canal 监控数据库流水
- 如下,我们修改本地数据库中的一条记录,发现这次数据变更已经被捕捉到并打印出来了
- 至此,使用 Canal 实现实时监控数据变更就完成了,当然这只是简单测试了一下,日后会着手实现基于该方法同步 MySQL 中的变更数据到 ES 中