本篇文章主要是安装logstash
的插件logstash-input-jdbc
.用于将SQL
的数据同步到ElasticSearch
.日后会对Logstash
进行学习.这个flag立的很好~
下载logstash
该版本为7.1.如需其他版本请另从官网下载传送门
https://www.elastic.co/cn/downloads/logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.1.1.tar.gz
tar -zxvf logstash-7.1.1.tar.gz
cd logstash-7.1.1/
./bin/logstash -e 'input { stdin {} } output { stdout {} }'
# 输入hello world 后稍等片刻会返回信息
{
"@version" => "1",
"message" => "hello world",
"@timestamp" => 2019-06-17T14:06:52.796Z,
"host" => "erhuadamowangdeMacBook-Pro.local"
}
安装成功.可能会有些提示错误信息,无伤大雅的
安装logstash-input-jdbc
# 在logstash-7.1.1目录下安装jdbc拓展
./bin/logstash-plugin install logstash-input-jdbc
# 提示下列信息表示安装成功
Validating logstash-input-jdbc
Installing logstash-input-jdbc
Installation successful
创建数据库
CREATE TABLE `sys_sql_test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(12) NOT NULL DEFAULT 'anonymous',
`mail` varchar(150) DEFAULT NULL,
`create_time` timestamp NULL DEFAULT NULL,
`update_time` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=44 DEFAULT CHARSET=utf8
插入数据
insert into sys_sql_test
(name,mail,create_time,update_time)
values
('二滑大魔王','qvbilam@163.com',now(),now()),
('二滑小天使','qvbilam@163.com',now(),now()),
('彩霞大笨猪','qvbilam@163.com',now(),now()),
('彩霞小呆瓜','qvbilam@163.com',now(),now());
创建ES索引
PUT :
http://localhost:8101/syn_es_test/
{
"settings":{
"number_of_shards":5,
"number_of_replicas":1
},
"mappings":{
"properties":{
"id":{
"type":"integer"
},
"name":{
"type":"text"
},
"mail":{
"type":"text"
},
"create_time":{
"type":"date"
},
"update_time":{
"type":"date"
}
}
}
}
配置input-jdbc
# 文件移动到自己喜欢的位置:)
cp -r logstash-7.1.1 /usr/local/Cellar/logstash
cd /usr/local/Cellar/logstash/bin
# 创建配置文件jdbc.conf和jdbc.sql
touch jdbc.conf jdbc.sql
jdbc.conf配置
input {
stdin {
}
jdbc {
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/video?useSSL=false"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/usr/local/Cellar/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.44-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
tracking_column => update_time
record_last_run => true
jdbc_default_timezone => "Asia/Shanghai"
statement_filepath => "/usr/local/Cellar/logstash/bin/jdbc.sql"
clean_run => false
schedule => "* * * * *"
}
}
output {
elasticsearch {
hosts => "127.0.0.1:8101"
index => "syn_es_test"
document_id => "%{id}"
document_type => "_doc"
}
stdout {
codec => json_lines
}
}
conf参数说明
健名 | 健值 | 说明 |
---|---|---|
jdbc_connection_string | jdbc:mysql://地址:端口号/数据库名?useSSL=false | 不使用SSL链接 |
jdbc_user | 数据库用户 | 数据库用户 |
jdbc_password | 用户密码 | 用户密码 |
jdbc_driver_library | 链接库的驱动包路径 | 自己下载放到你想要的目录 |
jdbc_driver_class | com.mysql.jdbc.Driver | 驱动类名 |
use_column_value | true/通过时间更新 | 如果用其他字段更新.这里一定要设置成flase .或者不写. |
tracking_column | update_time | 跟踪字段,通过自定义字段同步 |
jdbc_default_timezone | Asia/Shanghai | 设置时区/上海 |
statement_filepath | Sql语句文件路径 | 文件路径,自己创建 |
clean_run | false | 为true每次都相当于从头开始查询所有的数据库记录. |
schedule | * | 每分钟执行一次 |
hosts | 地址:端口号 | 链接ES |
index | - | ES中索引名,相当于库 |
document_id | %{id} | 主键名称 |
document_type | _doc | 类型名,ES7.0后只能是_doc .其他版本可自定义类型名 |
codec | json_lines | JSON格式输出 |
jdbc.sql配置
-- 纯属为了方便阅读换个行
select * from goods
where
update_time > :sql_last_value
同步执行
cd /usr/local/Cellar/logstash/bin
./logstash -f jdbc.conf
# 你可能会出现很INFO和WARN.先不用管
# 因为这是logstash的问题
# 我们主要是做数据同步,以后再分析问题
# 当出现下面信息说明已经开始对数据库查询.更新了/每分钟
[2019-06-18T13:48:00,044][INFO ][logstash.inputs.jdbc ] (0.000863s) SELECT version()
[2019-06-18T13:48:00,052][INFO ][logstash.inputs.jdbc ] (0.000746s) SELECT version()
[2019-06-18T13:48:00,069][INFO ][logstash.inputs.jdbc ] (0.001205s) SELECT count(*) AS `count` FROM (select * from sys_sql_test where update_time > '2019-06-18 13:47:00') AS `t1` LIMIT 1
插入同步
mysql> use video;
mysql> insert into sys_sql_test
-> (name,mail,create_time,update_time)
-> values
-> ('羡仙','qvbilam@163.com',now(),now()),
-> ('飘摇','qvbilam@163.com',now(),now()),
-> ('紫檀','qvbilam@163.com',now(),now());
Query OK, 3 rows affected (0.01 sec)
Records: 3 Duplicates: 0 Warnings: 0
logstash输出信息
[2019-06-18T13:53:00,290][INFO ][logstash.inputs.jdbc ] (0.000657s) SELECT version()
[2019-06-18T13:53:00,293][INFO ][logstash.inputs.jdbc ] (0.000645s) SELECT version()
[2019-06-18T13:53:00,300][INFO ][logstash.inputs.jdbc ] (0.001119s) SELECT count(*) AS `count` FROM (select * from sys_sql_test where update_time > '2019-06-18 13:52:00') AS `t1` LIMIT 1
[2019-06-18T13:53:00,385][INFO ][logstash.inputs.jdbc ] (0.001264s) SELECT * FROM (select * from sys_sql_test where update_time > '2019-06-18 13:52:00') AS `t1` LIMIT 50000 OFFSET 0
{"update_time":"2019-06-18T05:52:10.000Z","id":48,"@version":"1","mail":"qvbilam@163.com","name":"飘摇","create_time":"2019-06-18T05:52:10.000Z","@timestamp":"2019-06-18T05:53:00.598Z"}
{"update_time":"2019-06-18T05:52:10.000Z","id":49,"@version":"1","mail":"qvbilam@163.com","name":"紫檀","create_time":"2019-06-18T05:52:10.000Z","@timestamp":"2019-06-18T05:53:00.601Z"}
{"update_time":"2019-06-18T05:52:10.000Z","id":47,"@version":"1","mail":"qvbilam@163.com","name":"羡仙","create_time":"2019-06-18T05:52:10.000Z","@timestamp":"2019-06-18T05:53:00.573Z"}
这个时刻总是令人心惊胆战.不成功便..修改问题吧,瑟瑟发抖的查看ES
没问题~开心的跳起来了
更新同步
mysql> insert into sys_sql_test
-> (name,mail,create_time,update_time)
-> values
-> ('羡仙','qvbilam@163.com',now(),now()),
-> ('飘摇','qvbilam@163.com',now(),now()),
-> ('紫檀','qvbilam@163.com',now(),now());
Query OK, 3 rows affected (0.01 sec)
Records: 3 Duplicates: 0 Warnings: 0
mysql> update sys_sql_test
-> set name='橙池',update_time=now()
-> where id=49;
Query OK, 1 row affected (0.09 sec)
查看ES
# 哦~讨厌,又是这该死的成功感 :)
[2019-06-18T13:58:00,189][INFO ][logstash.inputs.jdbc ] (0.000732s) SELECT version()
[2019-06-18T13:58:00,194][INFO ][logstash.inputs.jdbc ] (0.000713s) SELECT version()
[2019-06-18T13:58:00,201][INFO ][logstash.inputs.jdbc ] (0.001025s) SELECT count(*) AS `count` FROM (select * from sys_sql_test where update_time > '2019-06-18 13:57:00') AS `t1` LIMIT 1
[2019-06-18T13:58:00,216][INFO ][logstash.inputs.jdbc ] (0.001045s) SELECT * FROM (select * from sys_sql_test where update_time > '2019-06-18 13:57:00') AS `t1` LIMIT 50000 OFFSET 0
{"update_time":"2019-06-18T05:57:18.000Z","id":49,"@version":"1","mail":"qvbilam@163.com","name":"橙池","create_time":"2019-06-18T05:52:10.000Z","@timestamp":"2019-06-18T05:58:00.221Z"
删除更新
你不会真的信了吧.这当然是不可能的啦~怎么会删除更新呢,小傻瓜.既然删除更新不了我们可以设置一个新的字段.例如delete_time
实现假删除.每次判断一下delete_time
是否为空.但是我感觉应该会像Mysql
一样.Mysql
难以优化引用可空列查询,它会使索引、索引统计和值更加复杂.所以就设置个默认值,大于.所以自己动脑子优化哈~
关于错误
在学习的道路中,跌跌撞撞的.踩了很多坑,举几个比较常见的错误吧.
错误-1
logstash启动报错
# 部分截图
[2019-06-18T14:38:24,120][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-06-18T14:38:24,193][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.1.1"}
[2019-06-18T14:38:38,918][WARN ][logstash.outputs.elasticsearch] You are using a deprecated config setting "document_type" set in elasticsearch. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature If you have any questions about this, please visit the #logstash channel on freenode irc. {:name=>"document_type", :plugin=><LogStash::Outputs::ElasticSearch index=>"syn_es_test", id=>"72c6a63b9739aecf18d8907e9cc5fda24213fbaa2e2a3469f4df0736aa488e0f", document_id=>"%{id}", hosts=>[//127.0.0.1:8101], document_type=>"_doc", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_bace4650-c0e9-4cf5-82a6-c12fff8ae458", enable_metric=>true, charset=>"UTF-8">, workers=>1, manage_template=>true, template_name=>"logstash", template_overwrite=>false, doc_as_upsert=>false, script_type=>"inline", script_lang=>"painless", script_var_name=>"event", scripted_upsert=>false, retry_initial_interval=>2, retry_max_interval=>64, retry_on_conflict=>1, ilm_enabled=>"auto", ilm_rollover_alias=>"logstash", ilm_pattern=>"{now/d}-000001", ilm_policy=>"logstash-policy", action=>"index", ssl_certificate_verification=>true, sniffing=>false, sniffing_delay=>5, timeout=>60, pool_max=>1000, pool_max_per_route=>100, resurrect_delay=>5, validate_after_inactivity=>10000, http_compression=>false>}
[2019-06-18T14:38:40,174][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:8101/]}}
[2019-06-18T14:38:40,703][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://127.0.0.1:8101/"}
[2019-06-18T14:38:40,847][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
[2019-06-18T14:38:40,858][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
解决
可以看到报了很多的错啊。其中ES
说You are using a deprecated config setting "document_type" set in elasticsearch
.这是因为ES7.0
的type
只能是_doc
.说不推荐的设置,其实这个问题也无伤大雅.你可以在设置中去取消document_type
的设置看看还会报错吗.我就不测了。
[WARN ]
[logstash.config.source.multilocal]
Ignoring the 'pipelines.yml' file because modules or command line options are specified
这个问题!我还没有去研究.应该是配置文件启动跟管道有关系.这里主要是讲数据同步.对于Logstash
以后会进行深入的学习.到时候在研究哈~
错误-2
先给我的配置文件加个错误的配置asda=asd
再启动
# 部分错误
[2019-06-18T14:47:06,383][ERROR]
[logstash.agent ] Failed to execute action
{:action=>LogStash::PipelineAction::Create/pipeline_id:main,
:exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, => at line 18, column 13 (byte 702)
after input {\n stdin {\n }\n jdbc { \n jdbc_connection_string => \"jdbc:mysql://127.0.0.1:3306/video?useSSL=false\"\n jdbc_user => \"root\"\n jdbc_password => \"root\"\n jdbc_driver_library => \"/usr/local/Cellar/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.44-bin.jar\"\n jdbc_driver_class => \"com.mysql.jdbc.Driver\"\n jdbc_paging_enabled => \"true\"\n jdbc_page_size => \"50000\"\n tracking_column => update_time\n record_last_run => true\n jdbc_default_timezone => \"Asia/Shanghai\" \n statement_filepath => \"/usr/local/Cellar/logstash/bin/jdbc.sql\"\n clean_run => false\n schedule => \"* * * * *\"\n asda", :backtrace=>["/usr/local/Cellar/logstash/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/usr/local/Cellar/logstash/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/usr/local/Cellar/logstash/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2577:in `map'", "/usr/local/Cellar/logstash/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:151:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in `initialize'", "/usr/local/Cellar/logstash/logstash-core/lib/logstash/java_pipeline.rb:23:in `initialize'", "/usr/local/Cellar/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:36:in `execute'", "/usr/local/Cellar/logstash/logstash-core/lib/logstash/agent.rb:325:in `block in converge_state'"]}
[2019-06-18T14:47:06,882][INFO ]
[logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2019-06-18T14:47:11,690][INFO ]
[logstash.runner ] Logstash shut down.
解决
可以看到logstash自己就关闭了.错误提示Failed to execute action
,执行操作失败.也就是我的配置又问题.他把我的配置直接贴过来弄的乱七八糟的.我都不想看我自己哪错了....遇到这样的问题找出错误改正就好~
错误-3
修改链接数据库的参数.就用SSL链接!
# 原来的
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/video?useSSL=false"
# 修改成
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/video"
重启执行
# 部分错误
Tue Jun 18 14:54:02 CST 2019
WARN: Establishing SSL connection without server's identity verification is not recommended.
According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set.
For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'.
You need either to explicitly disable SSL by setting useSSL=false,
or set useSSL=true and provide truststore for server certificate verification.
提示SSL链接的问题.咱没有证书,试试修改一条数据看能成功吗
mysql> update sys_sql_test
-> set name='error',update_time=now()
-> where id=49;
Query OK, 1 row affected (0.03 sec)
Rows matched: 1 Changed: 1 Warnings: 0
结果显示是成功的.
解决
这个问题的解决就是把?useSSL=false
加回来
# 正确配置
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/video?useSSL=false"
# 当然还有更牛逼的参数配置
?useSSL=false&characterEncoding=UTF-8&autoReconnect=true
# 总结起来就是禁止SSL;设置字符集;自动重连.
时区不一致怎么解决
https://www.cntofu.com/book/52/filter/date.md 我觉得数据中带入时间戳比较好