elasticsearch 实现与mysql 数据同步

一.安装elasticsearch

#因为elasticsearch 是基于java平台,所以需要先安装java

[root@localhost ~]# java -version #查看是否安装java

[root@localhost ~]# yum search java #查找java 版本

[root@localhost ~]# yum install java-17-openjdk-headless.x86_64 #安装java-17-openjdk-headless 版本

[root@localhost ~]# java -version

openjdk version "17.0.6" 2023-01-17 LTS

OpenJDK Runtime Environment (Red_Hat-17.0.6.0.10-3.el9) (build 17.0.6+10-LTS)

OpenJDK 64-Bit Server VM (Red_Hat-17.0.6.0.10-3.el9) (build 17.0.6+10-LTS, mixed mode, sharing)

#安装elasticsearch

[root@localhost home]# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.12.2-linux-x86_64.tar.gz #下载elasticsearch,也可以直接官网下载https://www.elastic.co/cn/elasticsearch 安装包

[root@localhost home]# tar -zxvf elasticsearch-8.12.2-linux-x86_64.tar.gz #解压

#启动elasticsearch

[root@localhost home]# cd /home/elasticsearch-8.12.2/

[root@localhost elasticsearch-8.12.2]# ./bin/elasticsearch

CompileCommand: exclude org/apache/lucene/util/MSBRadixSorter.computeCommonPrefixLengthAndBuildHistogram bool exclude = true

CompileCommand: exclude org/apache/lucene/util/RadixSelector.computeCommonPrefixLengthAndBuildHistogram bool exclude = true

二月 23, 2024 11:25:43 上午 sun.util.locale.provider.LocaleProviderAdapter

WARNING: COMPAT locale provider will be removed in a future release

[2024-02-23T11:25:44,075][ERROR][o.e.b.Elasticsearch ] [localhost] fatal exception while booting Elasticsearchjava.lang.RuntimeException: can not run elasticsearch as root

at org.elasticsearch.server@8.12.2/org.elasticsearch.bootstrap.Elasticsearch.initializeNatives(Elasticsearch.java:282)

at org.elasticsearch.server@8.12.2/org.elasticsearch.bootstrap.Elasticsearch.initPhase2(Elasticsearch.java:167)

at org.elasticsearch.server@8.12.2/org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:72)

See logs for more details.

ERROR: Elasticsearch did not exit normally - check the logs at /home/elasticsearch-8.12.2/logs/elasticsearch.log

ERROR: Elasticsearch exited unexpectedly, with exit code 1

#查看elasticsearch的日志 /home/elasticsearch-8.12.2/logs/elasticsearch.log,我这边提示是不能以root用户启动elasticsearch

[root@localhost home]# useradd es #添加用户

[root@localhost home]# passwd es #密码

更改用户 es 的密码 。

新的密码:

无效的密码: 密码少于 8 个字符

重新输入新的密码:

passwd:所有的身份验证令牌已经成功更新。

[root@localhost home]# chown -R es:es /home/elasticsearch-8.12.2 #更改文件所在的用户和组

[root@localhost home]# chmod -R 777 /home/elasticsearch-8.12.2 #修改权限

[root@localhost home]# su es #切换es用户

[es@localhost elasticsearch-8.12.2]$ ./bin/elasticsearch #启动elasticsearch (-d 是后台启动)

[2024-02-23T14:17:51,625][WARN ][o.e.h.n.Netty4HttpServerTransport] [localhost] received plaintext http traffic on an https channel, closing connection Netty4HttpChannel{localAddress=/192.168.243.134:9200, remoteAddress=/192.168.243.1:62306} #浏览器访问http://ip:9200 提示此错误,因为elasticsearch 默认开启了ssl认证,修改(config/elasticsearch.yml 中xpack.security.enabled:false 改为false)

[es@localhost elasticsearch-8.12.2]$ ./bin/elasticsearch #重新启动

二.安装mysql

mysql8相关的安装可以看下另一篇博客 https://editor.csdn.net/md/?articleId=135905811

三.安装logstash

1.下载安装logstash

[root@localhost home]# wget https://artifacts.elastic.co/downloads/logstash/logstash-8.12.2-linux-x86_64.tar.gz #与elasticsearch 同版本

[root@localhost home]# tar -zxvf logstash-8.12.2-linux-x86_64.tar.gz #解压

2.logstash 配置 logstash.yml

# Settings file in YAML

#

# Settings can be specified either in hierarchical form, e.g.:

# 使用分层形式设置管道批处理大小和批处理延迟

# pipeline:

# batch:

# size: 125 #管道批处理大小

# delay: 5 #管道批处理延迟

#

# Or as flat keys:

# 要表达与平面关键点相同的值

# pipeline.batch.size: 125

# pipeline.batch.delay: 5

#

# ------------ Node identity ------------

#

# Use a descriptive name for the node:

#

# node.name: test #节点名称,在集群中具备唯一性,默认为logstash主机的主机名

#

# If omitted the node name will default to the machine's host name

#

# ------------ Data path ------------------

#

# Which directory should be used by logstash and its plugins

# for any persistent needs. Defaults to LOGSTASH_HOME/data

#

# path.data: #Logstash及其插件使用目录

#

# ------------ Pipeline Settings --------------

#

# The ID of the pipeline.

#

# pipeline.id: main #管道id,默认为main

#

# Set the number of workers that will, in parallel, execute the filters+outputs

# stage of the pipeline.

#

# This defaults to the number of the host's CPU cores.

#

# pipeline.workers: 2 #并行执行管道的筛选器和输出阶段的工作者的数量,默认值为CPU的核数

#

# How many events to retrieve from inputs before sending to filters+workers

#

# pipeline.batch.size: 125 #单个工作进程从输入中收集的最大事件数

#

# How long to wait in milliseconds while polling for the next event

# before dispatching an undersized batch to filters+outputs

#

# pipeline.batch.delay: 50 #当创建管道事件批处理时,在向管道工作人员发送一个较小的批处理之前,等待每个事件的时间为多少毫秒

#

# Force Logstash to exit during shutdown even if there are still inflight

# events in memory. By default, logstash will refuse to quit until all

# received events have been pushed to the outputs.

#

# WARNING: Enabling this can lead to data loss during shutdown

#

# pipeline.unsafe_shutdown: false #当设置为true时,即使内存中仍然存在游离事件,也会在关闭期间强制Logstash退出,默认情况下,Logstash将拒绝退出,直到所有接收到的事件都被推送到输出,启用此选项可能导致关闭期间的数据丢失

#

# Set the pipeline event ordering. Options are "auto" (the default), "true" or "false".

# "auto" automatically enables ordering if the 'pipeline.workers' setting

# is also set to '1', and disables otherwise.

# "true" enforces ordering on the pipeline and prevent logstash from starting

# if there are multiple workers.

# "false" disables any extra processing necessary for preserving ordering.

#

# pipeline.ordered: auto #1.pipeline.ordered: auto并且pipeline.workers: 1,会自动启用事件排序;2.设置为true,强制排序;3.false 禁止排序

#

# Sets the pipeline's default value for `ecs_compatibility`, a setting that is

# available to plugins that implement an ECS Compatibility mode for use with

# the Elastic Common Schema.

# Possible values are:

# - disabled

# - v1

# - v8 (default)

# Pipelines defined before Logstash 8 operated without ECS in mind. To ensure a

# migrated pipeline continues to operate as it did before your upgrade, opt-OUT

# of ECS for the individual pipeline in its `pipelines.yml` definition. Setting

# it here will set the default for _all_ pipelines, including new ones.

#

# pipeline.ecs_compatibility: v8

#

# ------------ Pipeline Configuration Settings --------------

#

# Where to fetch the pipeline configuration for the main pipeline

#

# path.config: #管道的Logstash配置路径

#

# Pipeline configuration string for the main pipeline

#

# config.string: #包含要用于主管道的管道配置的字符串

#

# At startup, test if the configuration is valid and exit (dry run)

#

# config.test_and_exit: false #设置为true,启动时测试配置是否有效并退出,默认false

#

# Periodically check if the configuration has changed and reload the pipeline

# This can also be triggered manually through the SIGHUP signal

#

# config.reload.automatic: false #定期检查配置是否已更新并重新加载,默认false

#

# How often to check if the pipeline configuration has changed (in seconds)

# Note that the unit value (s) is required. Values without a qualifier (e.g. 60)

# are treated as nanoseconds.

# Setting the interval this way is not recommended and might change in later versions.

#

# config.reload.interval: 3s #间隔多少秒检查管道中的配置是否更改

#

# Show fully compiled configuration as debug log message

# NOTE: --log.level must be 'debug'

#

# config.debug: false #设置为true时,将完整编译的配置显示为debug日志消息,你还必须设置log.level: debug

#

# When enabled, process escaped characters such as \n and \" in strings in the

# pipeline configuration files.

#

# config.support_escapes: false #是否开启字符串转义

#

# ------------ API Settings -------------

# Define settings related to the HTTP API here.

#

# The HTTP API is enabled by default. It can be disabled, but features that rely

# on it will not work as intended.

#

# api.enabled: true #是否开启http访问

#

# By default, the HTTP API is not secured and is therefore bound to only the

# host's loopback interface, ensuring that it is not accessible to the rest of

# the network.

# When secured with SSL and Basic Auth, the API is bound to _all_ interfaces

# unless configured otherwise.

#

# api.http.host: 127.0.0.1 #http访问地址

#

# The HTTP API web server will listen on an available port from the given range.

# Values can be specified as a single port (e.g., `9600`), or an inclusive range

# of ports (e.g., `9600-9700`).

#

# api.http.port: 9600-9700 #http访问端口

#

# The HTTP API includes a customizable "environment" value in its response,

# which can be configured here.

#

# api.environment: "production" #http响应环境值

#

# The HTTP API can be secured with SSL (TLS). To do so, you will need to provide

# the path to a password-protected keystore in p12 or jks format, along with credentials.

#

# api.ssl.enabled: false #是否开启ssl

# api.ssl.keystore.path: /path/to/keystore.jks #ssl key证书路径

# api.ssl.keystore.password: "y0uRp4$$w0rD" #ssl key密码

#

# The availability of SSL/TLS protocols depends on the JVM version. Certain protocols are

# disabled by default and need to be enabled manually by changing `jdk.tls.disabledAlgorithms`

# in the $JDK_HOME/conf/security/java.security configuration file.

#

# api.ssl.supported_protocols: [TLSv1.2,TLSv1.3]

#

# The HTTP API can be configured to require authentication. Acceptable values are

# - `none`: no auth is required (default)

# - `basic`: clients must authenticate with HTTP Basic auth, as configured

# with `api.auth.basic.*` options below

# api.auth.type: none

#

# When configured with `api.auth.type` `basic`, you must provide the credentials

# that requests will be validated against. Usage of Environment or Keystore

# variable replacements is encouraged (such as the value `"${HTTP_PASS}"`, which

# resolves to the value stored in the keystore's `HTTP_PASS` variable if present

# or the same variable from the environment)

#

# api.auth.basic.username: "logstash-user"

# api.auth.basic.password: "s3cUreP4$$w0rD"

#

# When setting `api.auth.basic.password`, the password should meet

# the default password policy requirements.

# The default password policy requires non-empty minimum 8 char string that

# includes a digit, upper case letter and lower case letter.

# Policy mode sets Logstash to WARN or ERROR when HTTP authentication password doesn't

# meet the password policy requirements.

# The default is WARN. Setting to ERROR enforces stronger passwords (recommended).

#

# api.auth.basic.password_policy.mode: WARN

#

# ------------ Module Settings ---------------

# Define modules here. Modules definitions must be defined as an array.

# The simple way to see this is to prepend each `name` with a `-`, and keep

# all associated variables under the `name` they are associated with, and

# above the next, like this:

#模块定义,必须为数组

# 模块变量名格式必须为var.PLUGIN_TYPE.PLUGIN_NAME.KEY

#

# modules:

# - name: MODULE_NAME

# var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE

# var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE

# var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE

# var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE

#

# Module variable names must be in the format of

#

# var.PLUGIN_TYPE.PLUGIN_NAME.KEY

#

# modules:

#

# ------------ Cloud Settings ---------------

# Define Elastic Cloud settings here.

# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy

# and it may have an label prefix e.g. staging:dXMtZ...

# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'

# cloud.id:

#

# Format of cloud.auth is: :

# This is optional

# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'

# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'

# cloud.auth: elastic:

#

# ------------ Queuing Settings --------------

#

# Internal queuing model, "memory" for legacy in-memory based queuing and

# "persisted" for disk-based acked queueing. Defaults is memory

# 事件缓冲的内部排队模型,值可以指定为内存memory或磁盘persisted,默认为内存memory

# queue.type: memory

#

# If `queue.type: persisted`, the directory path where the pipeline data files will be stored.

# Each pipeline will group its PQ files in a subdirectory matching its `pipeline.id`.

# Default is path.data/queue.

#使用持久化队列(queue.type: persisted)时,存储管道数据文件的目录路径。默认值为path.data/queue

# path.queue:

#

# If using queue.type: persisted, the page data files size. The queue data consists of

# append-only data files separated into pages. Default is 64mb

# 使用持久化队列(queue.type: persisted)时,页面数据文件的大小。默认值为64mb

# queue.page_capacity: 64mb

#

# If using queue.type: persisted, the maximum number of unread events in the queue.

# Default is 0 (unlimited)

#使用持久化队列(queue.type: persisted)时,队列中未读事件的最大数目。默认为0

# queue.max_events: 0

#

# If using queue.type: persisted, the total capacity of the queue in number of bytes.

# If you would like more unacked events to be buffered in Logstash, you can increase the

# capacity using this setting. Please make sure your disk drive has capacity greater than

# the size specified here. If both max_bytes and max_events are specified, Logstash will pick

# whichever criteria is reached first

# Default is 1024mb or 1gb

#使用持久化队列(queue.type: persisted)时,队列的总容量(以字节为单位)。默认为1024mb

# queue.max_bytes: 1024mb

#

# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint

# Default is 1024, 0 for unlimited

#使用持久化队列(queue.type: persisted)时,强制检查点之前已确认事件的最大数量。默认值为1024,0表示无限制

# queue.checkpoint.acks: 1024

#

# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint

# Default is 1024, 0 for unlimited

#使用持久化队列(queue.type: persisted)时,强制检查点之前写入的最大事件数。默认值为1024,0表示无限制

# queue.checkpoint.writes: 1024

#

# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page

# Default is 1000, 0 for no periodic checkpoint.

#使用持久化队列(queue.type: persisted)时,强制执行检查点时的间隔(以毫秒为单位)。默认值为1000,0表示没有定期检查点

# queue.checkpoint.interval: 1000

#

# ------------ Dead-Letter Queue Settings --------------

# Flag to turn on dead-letter queue.

#是否启用死信队列。默认false

# dead_letter_queue.enable: false

# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries

# will be dropped if they would increase the size of the dead letter queue beyond this setting.

# Default is 1024mb

#每个死信队列的最大大小,超过该值,则会被删除,默认1024mb

# dead_letter_queue.max_bytes: 1024mb

# If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ

# have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files

# may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and

# being available to be read by the dead_letter_queue input when items are written infrequently.

# Default is 5000.

#启用死信队列,写入延迟的时间间隔,默认5000ms

# dead_letter_queue.flush_interval: 5000

# If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit.

# Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit.

# Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones.

#启用死信队列时,应控制删除哪些条目以避免超过大小限制。将值设置为“drop_newer”(默认值)以停止接受会使死信队列大小超过限制的新事件,将值设置为“drop_older”可删除包含最旧事件的队列页面,为新事件腾出空间。

# dead_letter_queue.storage_policy: drop_newer

# If using dead_letter_queue.enable: true, the interval that events have to be considered valid. After the interval has

# expired the events could be automatically deleted from the DLQ.

# The interval could be expressed in days, hours, minutes or seconds, using as postfix notation like 5d,

# to represent a five days interval.

# The available units are respectively d, h, m, s for day, hours, minutes and seconds.

# If not specified then the DLQ doesn't use any age policy for cleaning events.

#死信队列保存数据的有效时间,超时则从死信队列删除。

# dead_letter_queue.retain.age: 1d

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.

# Default is path.data/dead_letter_queue

#死信队列的存储路径

# path.dead_letter_queue:

#

# ------------ Debugging Settings --------------

#

# Options for log.level:

# * fatal

# * error

# * warn

# * info (default)

# * debug

# * trace

#日志等级

# log.level: info

#日志路径

# path.logs:

#

# ------------ Other Settings --------------

#

# Allow or block running Logstash as superuser (default: true)

# 是否运行超级用户运行Logstash

# allow_superuser: false

#

# Where to find custom plugins

#自定义插件的路径

# path.plugins: []

#

# Flag to output log lines of each pipeline in its separate log file. Each log filename contains the pipeline.name

# Default is false

# 是否启用在不同日志文件中每个管道的日志

# pipeline.separate_logs: false

#

# ------------ X-Pack Settings (not applicable for OSS build)--------------

#

# X-Pack Monitoring

# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html

#xpack.monitoring.enabled: false

#xpack.monitoring.elasticsearch.username: logstash_system

#xpack.monitoring.elasticsearch.password: password

#xpack.monitoring.elasticsearch.proxy: ["http://proxy:port"]

#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]

# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth

#xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx

#xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password

# another authentication alternative is to use an Elasticsearch API key

#xpack.monitoring.elasticsearch.api_key: "id:api_key"

#xpack.monitoring.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt"

#xpack.monitoring.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx

#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file

#xpack.monitoring.elasticsearch.ssl.truststore.password: password

# use either keystore.path/keystore.password or certificate/key configurations

#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file

#xpack.monitoring.elasticsearch.ssl.keystore.password: password

#xpack.monitoring.elasticsearch.ssl.certificate: /path/to/file

#xpack.monitoring.elasticsearch.ssl.key: /path/to/key

#xpack.monitoring.elasticsearch.ssl.verification_mode: full

#xpack.monitoring.elasticsearch.ssl.cipher_suites: []

#xpack.monitoring.elasticsearch.sniffing: false

#xpack.monitoring.collection.interval: 10s

#xpack.monitoring.collection.pipeline.details.enabled: true

#

# X-Pack Management

# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html

#xpack.management.enabled: false

#xpack.management.pipeline.id: ["main", "apache_logs"]

#xpack.management.elasticsearch.username: logstash_admin_user

#xpack.management.elasticsearch.password: password

#xpack.management.elasticsearch.proxy: ["http://proxy:port"]

#xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]

# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth

#xpack.management.elasticsearch.cloud_id: management_cluster_id:xxxxxxxxxx

#xpack.management.elasticsearch.cloud_auth: logstash_admin_user:password

# another authentication alternative is to use an Elasticsearch API key

#xpack.management.elasticsearch.api_key: "id:api_key"

#xpack.management.elasticsearch.ssl.ca_trusted_fingerprint: xxxxxxxxxx

#xpack.management.elasticsearch.ssl.certificate_authority: "/path/to/ca.crt"

#xpack.management.elasticsearch.ssl.truststore.path: /path/to/file

#xpack.management.elasticsearch.ssl.truststore.password: password

# use either keystore.path/keystore.password or certificate/key configurations

#xpack.management.elasticsearch.ssl.keystore.path: /path/to/file

#xpack.management.elasticsearch.ssl.keystore.password: password

#xpack.management.elasticsearch.ssl.certificate: /path/to/file

#xpack.management.elasticsearch.ssl.key: /path/to/certificate_key_file

#xpack.management.elasticsearch.ssl.cipher_suites: []

#xpack.management.elasticsearch.ssl.verification_mode: full

#xpack.management.elasticsearch.sniffing: false

#xpack.management.logstash.poll_interval: 5s

# X-Pack GeoIP Database Management

# https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html#plugins-filters-geoip-manage_update

#xpack.geoip.downloader.enabled: true

#xpack.geoip.downloader.endpoint: "https://geoip.elastic.co/v1/database"

3.pipelines.yml 配置

# List of pipelines to be loaded by Logstash

#

# This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings.

# Default values for omitted settings are read from the `logstash.yml` file.

# When declaring multiple pipelines, each MUST have its own `pipeline.id`.

#

# Example of two pipelines:

#

# - pipeline.id: test

# pipeline.workers: 1

# pipeline.batch.size: 1

# config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"

# - pipeline.id: another_test

# queue.type: persisted

# path.config: "/tmp/logstash/*.config"

#

# Available options:

#

# # name of the pipeline

# pipeline.id: mylogs

#

# # The configuration string to be used by this pipeline

# config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"

#

# # The path from where to read the configuration text

# path.config: "/etc/conf.d/logstash/myconfig.cfg"

#

# # How many worker threads execute the Filters+Outputs stage of the pipeline

# pipeline.workers: 1 (actually defaults to number of CPUs)

#

# # How many events to retrieve from inputs before sending to filters+workers

# pipeline.batch.size: 125

#

# # How long to wait in milliseconds while polling for the next event

# # before dispatching an undersized batch to filters+outputs

# pipeline.batch.delay: 50

#

# Set the pipeline event ordering. Options are "auto" (the default), "true" # # or "false".

# "auto" automatically enables ordering if the 'pipeline.workers' setting

# is also set to '1', and disables otherwise.

# "true" enforces ordering on a pipeline and prevents logstash from starting

# a pipeline with multiple workers allocated.

# "false" disable any extra processing necessary for preserving ordering.

#

# pipeline.ordered: auto

#

# # Internal queuing model, "memory" for legacy in-memory based queuing and

# # "persisted" for disk-based acked queueing. Defaults is memory

# queue.type: memory

#

# # If using queue.type: persisted, the page data files size. The queue data consists of

# # append-only data files separated into pages. Default is 64mb

# queue.page_capacity: 64mb

#

# # If using queue.type: persisted, the maximum number of unread events in the queue.

# # Default is 0 (unlimited)

# queue.max_events: 0

#

# # If using queue.type: persisted, the total capacity of the queue in number of bytes.

# # Default is 1024mb or 1gb

# queue.max_bytes: 1024mb

#

# # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint

# # Default is 1024, 0 for unlimited

# queue.checkpoint.acks: 1024

#

# # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint

# # Default is 1024, 0 for unlimited

# queue.checkpoint.writes: 1024

#

# # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page

# # Default is 1000, 0 for no periodic checkpoint.

# queue.checkpoint.interval: 1000

#

# # Enable Dead Letter Queueing for this pipeline.

# dead_letter_queue.enable: false

#

# If using dead_letter_queue.enable: true, the maximum size of dead letter queue for this pipeline. Entries

# will be dropped if they would increase the size of the dead letter queue beyond this setting.

# Default is 1024mb

# dead_letter_queue.max_bytes: 1024mb

#

# If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ

# have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files

# may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and

# being available to be read by the dead_letter_queue input when items are are written infrequently.

# Default is 5000.

#

# dead_letter_queue.flush_interval: 5000

# If using dead_letter_queue.enable: true, controls which entries should be dropped to avoid exceeding the size limit.

# Set the value to `drop_newer` (default) to stop accepting new events that would push the DLQ size over the limit.

# Set the value to `drop_older` to remove queue pages containing the oldest events to make space for new ones.

#

# dead_letter_queue.storage_policy: drop_newer

# If using dead_letter_queue.enable: true, the interval that events have to be considered valid. After the interval has

# expired the events could be automatically deleted from the DLQ.

# The interval could be expressed in days, hours, minutes or seconds, using as postfix notation like 5d,

# to represent a five days interval.

# The available units are respectively d, h, m, s for day, hours, minutes and seconds.

# If not specified then the DLQ doesn't use any age policy for cleaning events.

#

# dead_letter_queue.retain.age: 1d

#

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.

# Default is path.data/dead_letter_queue

#

# path.dead_letter_queue:

四.mysql数据同步到es

同步方式: 1.logstash 2.go-mysql-elasticsearch 3.canal(阿里云)

一.logstash 1.安装mysql-connector-java 插件(需与mysql 版本一致)

[root@localhost home]# wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-j-8.0.33-1.el9.noarch.rpm

--2024-02-23 16:01:15-- https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-j-8.0.33-1.el9.noarch.rpm

正在解析主机 downloads.mysql.com (downloads.mysql.com)... 23.66.135.36, 2600:1406:3c00:18b::2e31, 2600:1406:3c00:189::2e31

正在连接 downloads.mysql.com (downloads.mysql.com)|23.66.135.36|:443... 已连接。

已发出 HTTP 请求,正在等待回应... 302 Moved Temporarily

位置:https://cdn.mysql.com/archives/mysql-connector-java-8.0/mysql-connector-j-8.0.33-1.el9.noarch.rpm [跟随至新的 URL]

--2024-02-23 16:01:16-- https://cdn.mysql.com/archives/mysql-connector-java-8.0/mysql-connector-j-8.0.33-1.el9.noarch.rpm

正在解析主机 cdn.mysql.com (cdn.mysql.com)... 23.42.93.135, 2600:1406:3a00:293::1d68, 2600:1406:3a00:282::1d68

正在连接 cdn.mysql.com (cdn.mysql.com)|23.42.93.135|:443... 已连接。

已发出 HTTP 请求,正在等待回应... 200 OK

长度:2425346 (2.3M) [application/x-redhat-package-manager]

正在保存至: “mysql-connector-j-8.0.33-1.el9.noarch.rpm”

mysql-connector-j-8.0.33-1.el9.noarch.rpm 100%[==========================================================================================================================================>] 2.31M 736KB/s 用时 3.2s

2024-02-23 16:01:21 (736 KB/s) - 已保存 “mysql-connector-j-8.0.33-1.el9.noarch.rpm” [2425346/2425346])

[root@localhost home]# rpm -ivh mysql-connector-j-8.0.33-1.el9.noarch.rpm #安装

警告:mysql-connector-j-8.0.33-1.el9.noarch.rpm: 头V4 RSA/SHA256 Signature, 密钥 ID 3a79bd29: NOKEY

错误:依赖检测失败:

java-headless >= 1:1.8.0 被 mysql-connector-j-1:8.0.33-1.el9.noarch 需要

[root@localhost home]# yum -y install java-headless #直接重装java-openjdk

[root@localhost home]# rpm -ivh mysql-connector-j-8.0.33-1.el9.noarch.rpm

警告:mysql-connector-j-8.0.33-1.el9.noarch.rpm: 头V4 RSA/SHA256 Signature, 密钥 ID 3a79bd29: NOKEY

Verifying... ################################# [100%]

准备中... ################################# [100%]

正在升级/安装...

1:mysql-connector-j-1:8.0.33-1.el9 ################################# [100%]

[root@localhost home]# 安装完成,文件默认放在/usr/share/java/mysql-connector-java.jar

2.配置logstash.conf

#单表

input {

jdbc {

# mysql 数据库连接

jdbc_connection_string => "jdbc:mysql://192.168.243.134:3306/wine"

# 用户名和密码

jdbc_user => "root"

jdbc_password => "root"

# 驱动类名

jdbc_driver_class => "com.mysql.cj.jdbc.Driver"

# 驱动

jdbc_driver_library => "/usr/share/java/mysql-connector-j.jar"

# 是否分页

jdbc_paging_enabled => true

jdbc_page_size => "1000"

# 是否清除last_run_metadata_path 的记录,如果为真那么每次相当于从头开始查询所有的数据库记录

clear_run =>false

#是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称,

#此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.

use_column_value => true

#如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的

tracking_column => "unix_ts_in_secs"

#是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中

record_last_run => "true"

# 字段类型

tracking_column_type => "numeric"

# 设置监听间隔

schedule => "*/5 * * * * *"

# 执行sql

statement => "SELECT * FROM wine_address"

# 索引类型

type => "es_table"

}

}

filter {

ruby {

code => "event.set('@timestamp',event.get('timestamp'))"

}

mutate {

copy => { "id" => "[@metadata][_id]"}

remove_field => ["id", "@version", "unix_ts_in_secs","timestamp"]

}

}

output {

if [type]=="es_table" {

elasticsearch {

hosts => ["192.168.243.134:9200"] (es ip与端口)

index => "es_table_idx" (索引名称)

document_id => "%{[@metadata][_id]}"

}

}

}

#多表

input {

jdbc {

jdbc_connection_string => "jdbc:mysql://192.168.243.134:3306/wine"

jdbc_user => "root"

jdbc_password => "123456"

jdbc_driver_class => "com.mysql.cj.jdbc.Driver"

jdbc_driver_library => "/usr/share/java/mysql-connector-j.jar"

jdbc_paging_enabled => true

jdbc_page_size => "1000"

clean_run =>false

use_column_value => true

tracking_column => "unix_ts_in_secs"

record_last_run => "true"

tracking_column_type => "numeric"

schedule => "*/5 * * * * *"

statement => "SELECT * FROM wine_address"

type => "es_table"

}

jdbc {

jdbc_connection_string => "jdbc:mysql://192.168.243.134:3306/wine"

jdbc_user => "root"

jdbc_password => "123456"

jdbc_driver_class => "com.mysql.cj.jdbc.Driver"

jdbc_driver_library => "/usr/share/java/mysql-connector-j.jar"

jdbc_paging_enabled => true

jdbc_page_size => "1000"

clean_run =>false

use_column_value => true

tracking_column => "unix_ts_in_secs"

record_last_run => "true"

tracking_column_type => "numeric"

schedule => "*/5 * * * * *"

statement => "SELECT * FROM wine_area"

type => "es_table1"

}

}

filter {

mutate {

copy => { "id" => "[@metadata][_id]"}

remove_field => ["id", "@version", "unix_ts_in_secs","timestamp"]

}

}

output {

if [type]=="es_table" {

elasticsearch {

hosts => ["192.168.243.134:9200"]

index => "es_table_idx"

document_id => "%{address_id}"

}

}

if [type]=="es_table1" {

elasticsearch {

hosts => ["192.168.243.134:9200"]

index => "es_table_idx1"

document_id => "%{area_id}"

}

}

}

3.启动logstash

[root@localhost logstash-8.12.2]# ./bin/logstash -f /home/logstash-8.12.2/config/logstash.conf &

4.插入数据

INSERT INTO `wine`.`wine_address` ( `address_id`, `member_id`, `area_id`, `city_id`, `province_id`, `area_info`, `address`, `mob_phone`, `reciver_name`, `is_default`, `dis_mode`, `add_time` )

VALUES( 5, 5, 5, 5, 5, '测试', '测试', 14512456789, '10', 0, '1', 0 );

5.查看数据

二.go-mysql-elasticsearch

1.安装go环境

[root@localhost /]# wget https://golang.google.cn/dl/go1.15.4.linux-amd64.tar.gz #下载

[root@localhost /]# tar -zxvf go1.15.4.linux-amd64.tar.gz #解压

[root@localhost /]# mv go /usr/local/

[root@localhost river]# vim /etc/profile #添加环境变量

export GOROOT=/usr/local/go

export GOPATH=/root/go

export PATH=$PATH:$GOROOT/bin:$GOPATH/bin

[root@localhost /]# suorce /etc/profile #更新

2.下载go-mysql-elasticsearch

[root@localhost /] go get github.com/siddontang/go-mysql-elasticsearch #下载

[root@localhost river]# cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch

[root@localhost go-mysql-elasticsearch]# make #编译,编译成功后 go-mysql-elasticsearch/bin 目录下会生成名为 go-mysql-elasticsearch 的可执行文件

3.配置($GOPATH/src/github.com/siddontang/go-mysql-elasticsearch/etc/river.toml)

my_addr = "192.168.243.134:3306" #数据库ip地址

my_user = "root" #数据库用户名

my_pass = "123456" #数据库密码

es_addr = "192.168.243.134:9200" #es 地址

es_user = "" #es账号

es_pass = "" #es密码

data_dir = "/root/go/src/github.com/siddontang/go-mysql-elasticsearch/data" #数据存储目录

stat_addr = "192.168.243.134:12800" #内部地址加端口

stat_path = "/metrics"

server_id = 1001

flavor = "mysql"

mysqldump = "mysqldump "

bulk_size = 128

flush_bulk_time = "200ms"

skip_no_pk_table = false

[[source]]

schema = "wine" #数据库名称

tables = ["wine_role"] #数据表名称

[[rule]]

schema = "wine" #数据库名称

table = "wine_role" #数据表名称

index = "" #生成es数据索引名称,对应schema

type = "" #生成es数据类型,对应table

filter = ["id", "name"] #只同步的数据字段

4.启动

# 官网提示mysql版本小于8,es 版本小于6

[root@localhost go-mysql-elasticsearch]# ./bin/go-mysql-elasticsearch -config=./etc/river.toml

5.查看elasticsearch 数据(可以通过google elasticsearch-head 插件) 三.canal(https://github.com/alibaba/canal/releases)

1.下载安装canal

#下载1.17 版本,支持MySQL 8.0。因为本人mysql是8.0.36

[root@localhost home]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz #是canal的客户端适配器,可将其看作canal client

[root@localhost home]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.admin-1.1.7.tar.gz #canal 操作界面

[root@localhost home]# wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz #canal server 端

#创建文件夹

[root@localhost home]# madir canal

[root@localhost home]# cd canal

[root@localhost canal]# mkdir canal-adapter

[root@localhost canal]# mkdir canal-admin

[root@localhost canal]# mkdir canal-server

#解压

[root@localhost home]# tar -zxvf canal.adapter-1.1.7.tar.gz -C ./canal/canal-adapter/

[root@localhost home]# tar -zxvf canal.admin-1.1.7.tar.gz -C ./canal/canal-admin/

[root@localhost home]# tar -zxvf canal.deployer-1.1.7.tar.gz -C ./canal/canal-server/

#修改权限

[root@localhost home]# chmod -R 777 canal

2.配置

#canal-server/conf/canal.properties

#################################################

######### common argument #############

#################################################

# tcp bind ip

canal.ip = 127.0.0.1

# register ip to zookeeper

canal.register.ip =

canal.port = 11111

canal.metrics.pull.port = 11112

# canal instance user/passwd

# canal.user = canal

# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config

#canal.admin.manager = 127.0.0.1:8089

canal.admin.port = 11110

canal.admin.user = admin

canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

# admin auto register

#canal.admin.register.auto = true

#canal.admin.register.cluster =

#canal.admin.register.name =

canal.zkServers =

# flush data to zk

canal.zookeeper.flush.period = 1000

canal.withoutNetty = false

# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ

canal.serverMode = tcp

# flush meta cursor/parse position to file

canal.file.data.dir = ${canal.conf.dir}

canal.file.flush.period = 1000

## memory store RingBuffer size, should be Math.pow(2,n)

canal.instance.memory.buffer.size = 16384

## memory store RingBuffer used memory unit size , default 1kb

canal.instance.memory.buffer.memunit = 1024

## meory store gets mode used MEMSIZE or ITEMSIZE

canal.instance.memory.batch.mode = MEMSIZE

canal.instance.memory.rawEntry = true

## detecing config

canal.instance.detecting.enable = false

#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()

canal.instance.detecting.sql = select 1

canal.instance.detecting.interval.time = 3

canal.instance.detecting.retry.threshold = 3

canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery

canal.instance.transaction.size = 1024

# mysql fallback connected to new master should fallback times

canal.instance.fallbackIntervalInSeconds = 60

# network config

canal.instance.network.receiveBufferSize = 16384

canal.instance.network.sendBufferSize = 16384

canal.instance.network.soTimeout = 30

# binlog filter config

canal.instance.filter.druid.ddl = true

canal.instance.filter.query.dcl = false

canal.instance.filter.query.dml = false

canal.instance.filter.query.ddl = false

canal.instance.filter.table.error = false

canal.instance.filter.rows = false

canal.instance.filter.transaction.entry = false

canal.instance.filter.dml.insert = false

canal.instance.filter.dml.update = false

canal.instance.filter.dml.delete = false

# binlog format/image check

canal.instance.binlog.format = ROW,STATEMENT,MIXED

canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation

canal.instance.get.ddl.isolation = false

# parallel parser config

canal.instance.parser.parallel = true

## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()

#canal.instance.parser.parallelThreadSize = 16

## disruptor ringbuffer size, must be power of 2

canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info

canal.instance.tsdb.enable = true

canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}

canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;

canal.instance.tsdb.dbUsername = canal

canal.instance.tsdb.dbPassword = canal

# dump snapshot interval, default 24 hour

canal.instance.tsdb.snapshot.interval = 24

# purge snapshot expire , default 360 hour(15 days)

canal.instance.tsdb.snapshot.expire = 360

#################################################

######### destinations #############

#################################################

canal.destinations = example

# conf root dir

canal.conf.dir = ../conf

# auto scan instance dir add/remove and start/stop instance

canal.auto.scan = true

canal.auto.scan.interval = 5

# set this value to 'true' means that when binlog pos not found, skip to latest.

# WARN: pls keep 'false' in production env, or if you know what you want.

canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml

#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring

canal.instance.global.lazy = false

canal.instance.global.manager.address = ${canal.admin.manager}

#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml

canal.instance.global.spring.xml = classpath:spring/file-instance.xml

#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################

######### MQ Properties #############

##################################################

# aliyun ak/sk , support rds/mq

canal.aliyun.accessKey =

canal.aliyun.secretKey =

canal.aliyun.uid=

canal.mq.flatMessage = true

canal.mq.canalBatchSize = 50

canal.mq.canalGetTimeout = 100

# Set this value to "cloud", if you want open message trace feature in aliyun.

canal.mq.accessChannel = local

canal.mq.database.hash = true

canal.mq.send.thread.size = 30

canal.mq.build.thread.size = 8

##################################################

######### Kafka #############

##################################################

kafka.bootstrap.servers = 127.0.0.1:9092

kafka.acks = all

kafka.compression.type = none

kafka.batch.size = 16384

kafka.linger.ms = 1

kafka.max.request.size = 1048576

kafka.buffer.memory = 33554432

kafka.max.in.flight.requests.per.connection = 1

kafka.retries = 0

kafka.kerberos.enable = false

kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf

kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf

# sasl demo

# kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\";

# kafka.sasl.mechanism = SCRAM-SHA-512

# kafka.security.protocol = SASL_PLAINTEXT

##################################################

######### RocketMQ #############

##################################################

rocketmq.producer.group = test

rocketmq.enable.message.trace = false

rocketmq.customized.trace.topic =

rocketmq.namespace =

rocketmq.namesrv.addr = 127.0.0.1:9876

rocketmq.retry.times.when.send.failed = 0

rocketmq.vip.channel.enabled = false

rocketmq.tag =

##################################################

######### RabbitMQ #############

##################################################

rabbitmq.host =

rabbitmq.virtual.host =

rabbitmq.exchange =

rabbitmq.username =

rabbitmq.password =

rabbitmq.deliveryMode =

##################################################

######### Pulsar #############

##################################################

pulsarmq.serverUrl =

pulsarmq.roleToken =

pulsarmq.topicTenantPrefix =

/home/canal/canal-server/conf/example/instance.properties

#canal-server/conf/example

#################################################

## mysql serverId , v1.0.26+ will autoGen

canal.instance.mysql.slaveId=200

# enable gtid use true/false

canal.instance.gtidon=false

# position info

#mysql地址加端口

canal.instance.master.address=127.0.0.1:3306

#开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准

canal.instance.master.journal.name=mysql-bin.000045

#开始同步的binlog文件位置

canal.instance.master.position=237

#开始同步时间点 时间戳形式

canal.instance.master.timestamp=1709112558000

#开始同步gtid

canal.instance.master.gtid=

# rds oss binlog

canal.instance.rds.accesskey=

canal.instance.rds.secretkey=

canal.instance.rds.instanceId=

# table meta tsdb info

canal.instance.tsdb.enable=true

#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb

#canal.instance.tsdb.dbUsername=canal

#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =

#canal.instance.standby.journal.name =

#canal.instance.standby.position =

#canal.instance.standby.timestamp =

#canal.instance.standby.gtid=

# username/password

canal.instance.dbUsername=root #数据库用户名

canal.instance.dbPassword=123456 #数据库密码

canal.instance.connectionCharset = UTF-8

# enable druid Decrypt database password

canal.instance.enableDruid=false

#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex

canal.instance.filter.regex=.*\\..*

# table black regex

#配置不同步mysql库

canal.instance.filter.black.regex=mysql\\.slave_.*

# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch

# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config

canal.mq.topic=example

# dynamic topic route by schema or table regex

#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*

canal.mq.partition=0

# hash partition config

#canal.mq.enableDynamicQueuePartition=false

#canal.mq.partitionsNum=3

#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6

#canal.mq.partitionHash=test.table:id^name,.*\\..*

#

# multi stream for polardbx

canal.instance.multi.stream.on=false

#################################################

#/home/canal/canal-adapter/conf/application.yml

server:

port: 8081 #adapter 端口

spring:

jackson:

date-format: yyyy-MM-dd HH:mm:ss

time-zone: GMT+8

default-property-inclusion: non_null

canal.conf:

mode: tcp #tcp canal server读取模式 kafka rocketMQ rabbitMQ

flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效

zookeeperHosts:#集群模式下的zk地址, 如果配置了canalServerHost, 则以canalServerHost为准

syncBatchSize: 1000 #批处理大小

retries: -1 #重试次数,-1时表示一致阻塞

timeout: #获取数据的时长

accessKey:

secretKey:

consumerProperties:

# canal tcp consumer

canal.tcp.server.host: 192.168.243.134:11111

canal.tcp.zookeeper.hosts:

canal.tcp.batch.size: 500

canal.tcp.username:

canal.tcp.password:

# kafka consumer

kafka.bootstrap.servers: 127.0.0.1:9092

kafka.enable.auto.commit: false

kafka.auto.commit.interval.ms: 1000

kafka.auto.offset.reset: latest

kafka.request.timeout.ms: 40000

kafka.session.timeout.ms: 30000

kafka.isolation.level: read_committed

kafka.max.poll.records: 1000

# rocketMQ consumer

rocketmq.namespace:

rocketmq.namesrv.addr: 127.0.0.1:9876

rocketmq.batch.size: 1000

rocketmq.enable.message.trace: false

rocketmq.customized.trace.topic:

rocketmq.access.channel:

rocketmq.subscribe.filter:

# rabbitMQ consumer

rabbitmq.host:

rabbitmq.virtual.host:

rabbitmq.username:

rabbitmq.password:

rabbitmq.resource.ownerId:

srcDataSources: #数据来源

defaultDS:

url: jdbc:mysql://192.168.243.134:3306/wine?useUnicode=true

username: root

password: 123456

canalAdapters: #数据去处

- instance: example # canal instance Name or mq topic name #对应对应canal destination或者 mq topic

groups: #适配器组

- groupId: g1 #组id

outerAdapters: #适配器列表

- name: logger

# - name: rdb

# key: mysql1

# properties:

# jdbc.driverClassName: com.mysql.jdbc.Driver

# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true

# jdbc.username: root

# jdbc.password: 121212

# druid.stat.enable: false

# druid.stat.slowSqlMillis: 1000

# - name: rdb

# key: oracle1

# properties:

# jdbc.driverClassName: oracle.jdbc.OracleDriver

# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE

# jdbc.username: mytest

# jdbc.password: m121212

# - name: rdb

# key: postgres1

# properties:

# jdbc.driverClassName: org.postgresql.Driver

# jdbc.url: jdbc:postgresql://localhost:5432/postgres

# jdbc.username: postgres

# jdbc.password: 121212

# threads: 1

# commitSize: 3000

# - name: hbase

# properties:

# hbase.zookeeper.quorum: 127.0.0.1

# hbase.zookeeper.property.clientPort: 2181

# zookeeper.znode.parent: /hbase

- name: es8 #本人是es8

hosts: http://192.168.243.134:9200 # 127.0.0.1:9200 for rest mode,rest模式ip前要加http,transport 值端口为9300

properties:

mode: rest # or rest

# security.auth: test:123456 # only used for rest mode

cluster.name: elasticsearch #集群名称

# - name: kudu

# key: kudu

# properties:

# kudu.master.address: 127.0.0.1 # ',' split multi address

# - name: phoenix

# key: phoenix

# properties:

# jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver

# jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db

# jdbc.username:

# jdbc.password:

#//home/canal/canal-adapter/conf/es8/mytest_user.yml

dataSourceKey: defaultDS #源数据源的key, 对应application.yml配置srcDataSources中的值

destination: example

groupId: g1 #组id

esMapping:

_index: mytest_user #索引

_id: _id #id,如不配置该项必须配置下面的pk项_id否则会由es自动分配

_type: _doc #类型

upsert: true #支持不存在新增操作

# pk: id

sql:

"select a.album_id as _id, a.album_name, a.class_name, a.mark,a.goods_sku,a.add_time,a.merchant_id

from wine_album a"

# objFields:

# _labels: array:;

etlCondition: "where a.add_time>={}"

commitBatch: 3000 #批量提交大小

3.启动

[root@localhost canal-server]# ./bin/startup.sh

4.查看日志

#查看server日志 (/home/canal/canal-server/logs)

[root@localhost canal]# tail -5f canal.log

2024-02-28 14:47:50.369 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler

2024-02-28 14:47:50.375 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations

2024-02-28 14:47:50.383 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.

2024-02-28 14:47:50.414 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[127.0.0.1(127.0.0.1):11111]

2024-02-28 14:47:51.318 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

#查看example日志

[root@localhost example]# tail -5f example.log

2024-02-28 14:47:51.284 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$

2024-02-28 14:47:51.289 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

2024-02-28 14:47:51.342 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position

2024-02-28 14:47:51.342 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status

2024-02-28 14:47:51.749 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000044,position=4,serverId=1,gtid=,timestamp=1709092749000] cost : 403ms , the next step is binlog dump

#查看adapter日志(/home/canal/canal-adapter/logs/adapter)

[root@localhost logs]# tail -10f ./adapter/adapter.log

2024-02-28 18:56:58.798 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed

2024-02-28 18:56:59.017 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ...

2024-02-28 18:56:59.028 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded

2024-02-28 18:56:59.242 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es8 succeed

2024-02-28 18:56:59.248 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /home/canal/canal-adapter/plugin

2024-02-28 18:56:59.267 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed

2024-02-28 18:56:59.267 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......

2024-02-28 18:56:59.267 [Thread-3] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============

2024-02-28 18:56:59.273 [main] INFO c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 2.381 seconds (JVM running for 2.951)

2024-02-28 18:56:59.356 [Thread-3] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============

2024-02-28 18:58:32.556 [pool-3-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"album_id":4,"album_name":"2","class_name":6,"mark":"2","goods_sku":"2","add_time":2,"merchant_id":2}],"database":"wine","destination":"example","es":1709117912000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["album_id"],"sql":"","table":"wine_album","ts":1709117912427,"type":"INSERT"}

2024-02-28 18:58:32.805 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"album_id":4,"album_name":"2","class_name":6,"mark":"2","goods_sku":"2","add_time":2,"merchant_id":2}],"database":"wine","destination":"example","es":1709117912000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["album_id"],"sql":"","table":"wine_album","ts":1709117912427,"type":"INSERT"}

Affected indexes: mytest_user

5.查看es 索引数据

参考链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: