MemorySearch 开发者文档 MemorySearch 开发者文档
首页
    • 概述
    • 系统设计
    • 维护升级
    • 高效多元搜索
    • 互动创作平台
    • 流量统计分析
    • 个人中心管理
    • 资源全面管理
    • 概览
    • Ant Design Vue 脚手架
    • Vuepress 静态文档站点
    • 定制前端项目初始模板
    • 基础信息管理
    • 高效多元搜索
    • Elastic Stack 全家桶
    • 设计模式荟萃
    • 外源数据抓取
    • 数据实时同步
    • 流量速率管控
    • 缓存性能调优
    • 定时任务调度
    • 权限校验机制
    • 异步编程支持
    • 初始模板定制
    • 全局逻辑梳理
  • 简介
  • 常见问题与解答
首页
    • 概述
    • 系统设计
    • 维护升级
    • 高效多元搜索
    • 互动创作平台
    • 流量统计分析
    • 个人中心管理
    • 资源全面管理
    • 概览
    • Ant Design Vue 脚手架
    • Vuepress 静态文档站点
    • 定制前端项目初始模板
    • 基础信息管理
    • 高效多元搜索
    • Elastic Stack 全家桶
    • 设计模式荟萃
    • 外源数据抓取
    • 数据实时同步
    • 流量速率管控
    • 缓存性能调优
    • 定时任务调度
    • 权限校验机制
    • 异步编程支持
    • 初始模板定制
    • 全局逻辑梳理
  • 简介
  • 常见问题与解答
  • 内容概览

    • 概览
  • 前端

    • Ant Design Vue 脚手架
    • Vuepress 静态文档站点
    • 定制前端项目初始模板
  • 后端

    • 基础信息管理
    • 高效多元搜索
    • Elastic Stack 全家桶
    • 设计模式荟萃
    • 外源数据抓取
    • 数据实时同步
      • 🥗 为什么要做数据同步
      • 🍝 数据同步方案
        • 常见方案
        • 全量同步和增量同步
      • ☕ Logstash 数据同步管道
        • 下载安装
        • demo 测试
        • 自定义配置
        • 同步 MySQL
        • 同步 ES
      • 🍚 Canal 监控数据库流水
        • 基础配置
        • 本地数据库配置
        • Canal 的下载安装
        • Canal 配置
        • 启动 Canal
      • 🥣 实战操作
    • 流量速率管控
    • 缓存性能调优
    • 定时任务调度
    • 权限校验机制
    • 异步编程支持
    • 初始模板定制
    • 全局逻辑梳理
目录

数据实时同步

温馨提示

在这里,通过深入学习代码实现,您将系统地探索如何实现数据实时同步功能的具体代码细节。

我们将以最简单直接的方式为您呈现内容!

# 🥗 为什么要做数据同步

  • 在业务实现过程中,我们为何需要实施数据同步?

在业务实现中,数据同步是非常重要的,主要有以下几个原因:

  • 数据一致性:在分布式系统或者多节点环境中,数据同步可以确保各个节点或系统之间的数据保持一致。如果不同节点或系统之间的数据不一致,可能会导致业务逻辑混乱,甚至产生错误的结果。

  • 故障恢复:当某个节点或系统出现故障时,其他节点或系统可以通过数据同步来恢复数据,保证业务的连续性。

  • 负载均衡:在分布式系统中,数据同步可以帮助实现负载均衡。通过将数据同步到多个节点,可以分散请求压力,提高系统的整体性能。

  • 业务扩展性:随着业务的发展,可能需要增加新的节点或系统来支持更多的业务。数据同步可以帮助新加入的节点或系统快速获取到需要的数据,从而顺利地参与到业务中。

  • 数据备份与恢复:数据同步也可以作为数据备份的一种方式。通过定期同步数据到备份节点或系统,可以在原节点或系统出现故障时快速恢复数据。

  • 安全性:数据同步可以帮助实现数据的冗余存储,即使某个节点或系统被攻击或损坏,数据也不会丢失。这有助于保护数据的安全性。

综上所述,数据同步在业务实现中起到了保障数据一致性、故障恢复、负载均衡、业务扩展性、数据备份与恢复以及安全性等多种重要作用。

# 🍝 数据同步方案

# 常见方案

常见的数据同步方案主要有以下四种:

  • 同步双写:在将数据写到 MySQL 时,同时将数据写到 ES。优点是业务逻辑简单、实时性高;缺点是硬编码,有需要写入 MySQL 的地方都需要添加写入 ES 的代码,业务强耦合,存在双写失败丢数据风险,性能较差。

  • 异步双写:针对多数据源写入的场景,可以借助 MQ 实现异步的多源写入。优点是性能高,不易出现数据丢失问题,多源写入之间相互隔离,便于扩展更多的数据源写入。

  • 基于 SQL 抽取:通过定时器程序按一定的时间周期扫描指定的表,把该时间段内发生变化的数据提取出来,逐条写入到 ES 中。优点是不改变原来代码,没有侵入性、没有硬编码;缺点是时效性较差,对数据库有一定的轮询压力。

  • 基于 Binlog 实时同步:利用 MySQL 的 Binlog 来进行同步。具体步骤包括读取 MySQL 的 Binlog 日志,获取指定表的日志信息;将读取的信息转为 MQ;编写一个 MQ 消费程序;不断消费 MQ,每消费完一条消息,将消息写入到 ES 中。优点是没有代码侵入、没有硬编码;原有系统不需要任何变化,没有感知;性能高;业务解耦,不需要关注原来系统的业务逻辑。缺点是构建 Binlog 系统复杂;如果采用 MQ 消费解析的 Binlog 信息,也会像方案二一样存在 MQ 延时的风险。

# 全量同步和增量同步

增量同步和全量同步是数据同步的两种常见方式。

  • 增量同步,顾名思义,只同步发生变化的数据。这种方式的优点是可以大大减少数据传输量,提高同步效率,特别是在数据量大且变化频繁的场景下非常有用。但是,它的缺点是需要记录数据的变化情况,实现起来相对复杂。

  • 全量同步则是将全部数据都进行同步,无论数据是否发生变化。这种方式的优点是实现简单,易于理解。但是,当数据量非常大时,全量同步会消耗大量的网络和存储资源,同步效率较低。

在实际使用中,需要根据具体的业务需求和场景来选择合适的同步方式。例如,对于数据变化不频繁,且对数据一致性要求不高的业务,可以选择全量同步;而对于数据变化频繁,需要实时反映业务状态的业务,则更适合使用增量同步。

# ☕ Logstash 数据同步管道

接下来,我们将构建一个基础的示例,以实际演示如何利用 Logstash 的数据传输管道,确保 MySQL 和 Elasticsearch 之间的数据一致性。我们将通过这一过程,深入理解 Logstash 在实现数据库与搜索引擎之间的数据同步中的关键作用。

推荐阅读

Getting Started with Logstash | Logstash Reference [7.17] | Elastic (opens new window)

Running Logstash on Windows | Logstash Reference [7.17] | Elastic (opens new window)

传输 和 处理 数据的管道

  • 好处:用起来方便,插件多
  • 缺点:成本更大、一般要配合其他组件使用(比如 kafka)

本质上就是把编程式同步改为配置式同步,更加方便快捷

# 下载安装

Download Logstash Free | Get Started Now | Elastic (opens new window)

# demo 测试

[Stashing Your First Event | Logstash Reference 7.17] | Elastic (opens new window)

image-20230922205535009

我们根据官网指引,可以找到这么一段测试代码:

logstash.bat -e "input { stdin { } } output { stdout {} }"
  • 在bin 目录下执行这段代码(可以理解为:指定输入输出配置均为默认,开启 Logstash)

待启动完成后,随便输入内容,如果在命令行中有返回相同内容,则测试成功

  • 如图所示:

image-20230922205958186

# 自定义配置

快速开始:[Running Logstash on Windows | Logstash Reference 7.17] | Elastic (opens new window)(2023/09/22 晚)

  • 在官方文档中,找到这一段简单的示例配置:

image-20230922210307761

  • 将这段配置粘贴进 config 下的 **logstash-sample.conf 配置文件(可以保留该原文件,复制一份重命名)**中:
  • 这几行配置是干什么的呢?简单来讲就是定义了输入和输出:监听 UDP,并输出

image-20230922210525309

  • 按官方文档的操作来,尝试加载这个配置文件 并 启动 Logstash:
.\bin\logstash.bat -f .\config\myTask.conf

[Running Logstash on Windows | Logstash Reference 7.17] | Elastic (opens new window)

image-20230922211016754

运行这行命令,可以看到 Logstash 成功启动运行了

image-20230922211330283

# 同步 MySQL

[Jdbc input plugin | Logstash Reference 7.17] | Elastic (opens new window)

在官方文档中,找到这段配置,用来把 input 输入与 MySQL 数据库中的数据同步(2023/09/22 晚)

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
    parameters => { "favorite_artist" => "Beethoven" }
    schedule => "* * * * *"
    statement => "SELECT * from songs where artist = :favorite_artist"
  }
}

这些配置是不是很眼熟?我们简单说明一下:

  • jdbc_driver_library:就是加载 MySQL 数据库的 jar 包(依赖)
  • 接下来的四行配置不用多说,连接MySQL 的驱动、对应数据库、用户名、密码
  • statement:SQL 表达式,用来从 MySQL 中获取数据
  • parameters:起到动态配置 SQL 语句中的参数的作用
  • schedule:Cron 表达式,实现定时查询

我们按自己实际的的需求,可以简单地修改配置

  • 当然了,如果我们我们现在加载此配置、启动 Logstash,一定会报错,如图所示:

image-20230922213346119

原因很简单,就是配置中的 mysql jar 包找不到,我们需要自己配置一个 mysql jar 包,并正确配置它的路径

这里有个技巧:在 IDEA 中找到项目所依赖的 jar 包

  • 如图所示,选择对应的依赖后,可以直接在文件管理器中打开

image-20230922213029171

  • 然后直接在文件管理器中复制,粘贴到 Logstash 目录下即可

加载配置、启动 Logstash,启动成功了:

image-20230922215626868

聊聊我在这段配置上踩过的坑吧:

  • mysql jar 包路径外层多加了一层双引号
  • 用户名、密码配置错误
  • SQL 语句中 where 多写了一个
  • timestamp 写成 timestampe

这段配置绝对不能出现任何问题,否则就会出现严重的报错。我的最终配置是这样的:

# Sample Logstash configuration for receiving
# UDP syslog messages over port 514

input {
  jdbc {
    jdbc_driver_library => "D:\softWare\logstash\logstash-7.17.9\config\mysql-connector-java-8.0.29.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/memory_search"
    jdbc_user => "root"
    jdbc_password => "Dw990831"
    statement => "SELECT * from post where 1 = 1"
    schedule => "*/5 * * * *"
  }
}

output {
  stdout { codec => rubydebug }
}

启动成功后,现在的 Logstash 是每 5 秒从数据库中同步所有数据(当然这是根据 SQL 语句来执行的),这数据量可能会很大

这就是全量同步了,我们不需要同步所有数据,我们可以选择同步最近更新的数据

添加如下配置:

    statement => "SELECT * from post where updateTime > :sql_last_value"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "updateTime"
    schedule => "*/5 * * * * *"
    jdbc_default_timezone => "Asia/Shanghai"

这段配置就是根据 updateTime 字段的最新值,同步 updateTime 大于最新值的数据:

image-20230922224744313

  • 所以说,sql_last_value 指定的是上次查到的数据的最后一行的指定字段,新的查询就是比较这个指定字段与 sql_last_value 的大小
  • 但是经过多次查询发现,这里的 sql_last_value 始终不变
    • 我们可以在 data\plugins\inputs\jdbc\logstash_jdbc_last_run 看到 sql_last_value 指定的数据,确实没有变化:

image-20230922230142159

  • 将 tracking_column => "updateTime" 的 updateTime 修改为 updatetime,日期同步成功

image-20230922225440159

更新下数据库中的最新值,再看看效果,确实拿到了数据库中最新修改的值(参照上次修改后的最新值):

image-20230922230720711

# 同步 ES

  • 调试这么久,Logstash 能够正常同步 MySQL 了,接下来就是把同步到 input 的数据,同步到 ES 中了**(2023/09/22 晚)**

  • 直接在官方文档中,找到输出 output 的相关配置:

image-20230923124744806

  • 跟着官网简单的 demo 学就行,配置过一次就会了,这是我完成同步 ES 后的配置:(部分私密信息已做处理)
# Sample Logstash configuration for receiving
# UDP syslog messages over port 514

input {
  jdbc {
    jdbc_driver_library => "D:\softWare\logstash\logstash-7.17.9\config\mysql-connector-java-8.0.29.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/"******""
    jdbc_user => "******"
    jdbc_password => ""******""
    statement => "SELECT * from post where updateTime > :sql_last_value"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "updatetime"
    schedule => "*/5 * * * * *"
    jdbc_default_timezone => "Asia/Shanghai"
  }
}

output {
  stdout { codec => rubydebug }

  elasticsearch {
    hosts => "127.0.0.1:9200"
    index => "post_v1"
    document_id => "%{id}"
  }
}

这里简单介绍下这几个配置的作用:

  • host:标识要进行同步的 ES 地址,即指定了:数据从 MySQL 中取出后,发送到哪

  • index:目标索引

  • document_id:指定目标索引内,每一个文档的 id,就是从 SELECT * 中解构出 id 值

  • data_stream:特殊的数据格式,我们从数据库中取到的都是普通类型,不需要这行配置

  • 其他的目前暂且不需要了解,日后再进一步学习

加载配置,运行 Logstash,可以看到运行成功了,数据库中最新更新的数据也成功同步到了本地的 ES 上了:

image-20230923125842246

从同步结果来看,我们还需要解决几个问题:

  • 排除某些不需要同步的字段
  • ES 中同步过来的文档数据字段都是全小写,不是驼峰式
  • 查询结果按 updateTime 降序排列,避免重排序,导致多同步了不必要的数据,造成性能浪费

解决这三个问题当然很简单:

首先修改下 SQL 语句:

statement => "SELECT * from post where updateTime > :sql_last_value and updateTime < now() order by updateTime desc"

再写入如下过滤配置,将对应字段进行驼峰式转换,并排除不需要的字段:

filter {
  mutate {
    rename => {
      "updatetime" => "updateTime"
      "userid" => "userId"
      "createtime" => "createTime"
      "isdelete" => "isDelete"
    }
    remove_field => ["thumbnum", "favournum"]
  }
}

重新进行同步,结果完美,顺利完成:

image-20230923144500259

# 🍚 Canal 监控数据库流水

# 基础配置

🥣 推荐阅读:

  • Java 开发 - Canal 的基本用法_canal java-CSDN 博客 (opens new window)
  • ClientExample · alibaba/canal Wiki (github.com) (opens new window)

我们简单实现通过 Canal 监控数据库流水,实时监听 MySQL 中的数据变更,并实时同步变更的数据到 Elasticsearch 中

# 本地数据库配置

  • 创建新用户:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
  • 在本地 MySQL 中的 my.ini 文件中做如下配置(将本地的 MySQL 作为一个主节点,开启本地 binlog 生成):
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

# Canal 的下载安装

  • ClientExample · alibaba/canal Wiki (github.com) (opens new window)

# Canal 配置

  • 在 \conf\example\instance.properties 目录下做如下配置:
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

image-20231204204312817

# 启动 Canal

  • 在 bin 目录下,输入以下命令启动:
startup.bat

image-20231204204656098

# 🥣 实战操作

导入相关依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>
  • 我们拉取官网提供的 demo 代码,直接启动:

image-20231204203419273

package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}
}

本地数据库修改数据,测试 Canal 监控数据库流水

  • 如下,我们修改本地数据库中的一条记录,发现这次数据变更已经被捕捉到并打印出来了

image-20231204204957589

  • 至此,使用 Canal 实现实时监控数据变更就完成了,当然这只是简单测试了一下,日后会着手实现基于该方法同步 MySQL 中的变更数据到 ES 中
外源数据抓取
流量速率管控

← 外源数据抓取 流量速率管控→

Theme by Vdoing | Copyright © 2023-2024 回忆如初
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式