基于 Apache SeaTunnel 的 MySQL 到 Elasticsearch 实时同步解决方案


作者 | javalover123

11




前言


  • 最近,项目有几个表要从 MySQL 实时同步到 另一个 MySQL,也有同步到 ElasticSearch 的。
  • 目前,公司生产环境同步,用的是 阿里云的 DTS,每个同步任务每月 500多元,有点小贵。
  • 其他环境:MySQL同步到ES,用的是 CloudCanal,不支持 数据转换,添加同步字段比较麻烦,社区版限制5个任务,不够用;MySQL同步到MySQL,用的是 debezium,不支持写入 ES。
  • 恰好3年前用过 SeaTunnel 的 前身 WaterDrop,那就开始吧。本文以 2.3.1 版本,Ubuntu 系统为例

22




开源数据集成平台SeaTunnel

1. 简介

  • SeaTunnel 是 Apache 软件基金会下的一个高性能开源大数据集成工具,为数据集成场景提供灵活易用、易扩展并支持千亿级数据集成的解决方案。
  • Seaunnel 为实时(CDC)和批量数据提供高性能数据同步能力,支持超百种数据源(https://seatunnel.apache.org/docs/2.3.1/Connector-v2-release-state/),已经在B站、腾讯云、字节等数百家公司使用。
  • 可以选择 SeaTunnel Zeta 引擎上运行,也可以在 Apache Flink 或 Spark 引擎上运行。

2. 安装

  • 下载,这里选择 2.3.1 版本(https://seatunnel.apache.org/download/),执行 tar -xzvf apache-seatunnel-*.tar.gz 解压缩
  • 因为 2.3.2 版本,MySQL-CDC 找不到驱动,bug修复详见(https://github.com/apache/seatunnel/pull/4945/files)。
Caused by: java.sql.SQLException: No suitable driver
        at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
        at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106)
        ... 20 more

        ... 11 more

        at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)

3. 安装 connectors 插件

  • 执行 bash bin/install-plugin.sh,国内建议先配置 maven
    镜像,不然容易失败或者慢
  • 官方文档写着执行 sh bin/install-plugin.sh,我在 Ubuntu 20.04.2 LTS 上执行报错(bin/install-plugin.sh: 54: Bad substitution),我提了PR(https://github.com/apache/seatunnel-website/pull/253)。

4. 编写配置文件

  • config 目录下,新建配置文件:如 mysql-es-test.conf
  • 添加 env 配置。因为是实时同步,这里 job.mode = "STREAMING,execution.parallelism 是并发数
env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 2000
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
  • MySQL 实时同步,需开启 binlog(https://debezium.io/documentation/reference/1.6/connectors/mysql.html#setting-up-mysql)
  • 添加数据源配置(https://seatunnel.apache.org/docs/2.3.1/connector-v2/source/MySQL-CDC#options)。result_table_name 取个 临时表名,便于后续使用。table-names 必须是数据库.表名,base-url 必须指定数据库。startup.mode 默认是 INITIAL,先同步历史数据,后增量同步,详情请查看(https://github.com/apache/seatunnel/blob/3cd51b6defd3ddd3b011cf0f6b48f3c209bf9d22/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/StartupMode.java#L27)
source {
  MySQL-CDC {
    result_table_name = "t1"
    server-id = 5656
    username = "root"
    password = "pwd"
    table-names = ["db.t1"]
    base-url = "jdbc:mysql://host:3306/db"
  }
}
  • 添加 转换 配置,sql 比较灵活。函数列表请查看(https://seatunnel.apache.org/docs/2.3.1/transform-v2/sql-functions)
transform {
  Sql {
    source_table_name = "t1"
    query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 = '1'"
  }
}
  • 添加输出配置(https://seatunnel.apache.org/docs/2.3.1/connector-v2/sink/Elasticsearch#options),CDC 实时同步 es,必须配置 primary_keys
sink {
    Elasticsearch {
        hosts = ["host:9200"]
        username = "elastic"
        password = "pwd"

        index = "index_t1"
        # cdc required options
        primary_keys = ["id"]
    }
}
  • 最终配置截图

5. 启动任务

这里以本地模式为例(https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/local-mode),另有集群(https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/deployment)、spark、flink 模式。

./bin/seatunnel.sh -e local --config ./config/mysql-es-test.conf


33




总结


  • 开源数据集成平台SeaTunnel 能够比较方便的进行 MySQL 实时同步到 es 等,免费,还方便添加 同步字段。更多强大功能,请看官方文档(https://seatunnel.apache.org/docs/2.3.1/about)。
  • 新版本自带同步引擎(https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/about),不用依赖 spark、flink 等运行,降低了 小数据量同步场景 部署复杂度
  • 新版本开始提供 UI界面(https://github.com/apache/seatunnel-web),目前强依赖调度平台 Apache DolphinScheduler
本文遵守【CC BY-NC】协议,转载请保留原文出处及本版权声明,否则将追究法律责任。本文首先发布于 https://www.890808.xyz/data-sync-apache-seatunnel/。
欢迎投稿,投稿内容包含不限于:Apache SeaTunnel数据同步案例、使用经验分享、故障处理、源码分析、项目贡献、参与开源的故事、技术文章翻译、会议分享整理等。

投稿形式包含不限于:文字、视频、漫画、音频、直播等。

欢迎关注SeaTunnel微信公众号投稿:

Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

仓库地址: 
https://github.com/apache/seatunnel

网址:
https://seatunnel.apache.org/

Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
 
衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/seatunnel/issues

贡献代码:
https://github.com/apache/seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel

精彩推荐




Apache SeaTunnel Connector 使用文档和使用案例有奖征稿来了!一起玩开源




OPPO 选择 SeaTunnel:提高了平台的开发和生产能力




讲师征集令 | Apache SeaTunnel Meetup 分享嘉宾火热招募中!



一键三连-点赞在看转发⭐️!



下载地址

免责声明:

1、本站资源由自动抓取工具收集整理于网络。

2、本站不承担由于内容的合法性及真实性所引起的一切争议和法律责任。

3、电子书、小说等仅供网友预览使用,书籍版权归作者或出版社所有。

4、如作者、出版社认为资源涉及侵权,请联系本站,本站将在收到通知书后尽快删除您认为侵权的作品。

5、如果您喜欢本资源,请您支持作者,购买正版内容。

6、资源失效,请下方留言,欢迎分享资源链接

文章评论

0条评论