Logstash实现Elasticsearch与MySQL的全量与增量同步

1、下载与elasticsearch版本匹配的logstash;

2、准备jdbc包:

如:mysql-connector-java-8.0.18.jar,并放入logstash/logstash-core/lib/jars下,不然会报驱动找不到;

3、写配置文件,如下所示:

input {
    stdin {
    }
    jdbc {            ###数据源配置,单表只需一个jdbc块,多表就写多个
      type => "memories"    ###类型,用作输入时判断,并会写入elasticsearch中,所以从MySQL中查询的字段中如果有type可能会同步失败
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/memories"
      jdbc_user => "root"
      jdbc_password => "root"
      jdbc_driver_library => "/home/ubuntu/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.18.jar"   ###jdbc驱动包的位置
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      lowercase_column_names => false       ###是否自动将大写转为小写
      statement_filepath => "/home/ubuntu/sql/memories.sql"       ###sql语句的位置,或者直接用statement => "You SQL"
      last_run_metadata_path => "/home/ubuntu/log/last_memories.txt"    ###上次更新的时间点记录的位置
      clean_run => false    ### 是否自动删除更新时间点记录,增量同步时需要设为true
      schedule => "* * * * *"   ###同步计划时间
    }
    jdbc {    ### 第二张表
      type => "user"
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/memories"
      jdbc_user => "root"
      jdbc_password => "root"
      jdbc_driver_library => "/home/ubuntu/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.18.jar"
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      lowercase_column_names => false
      statement_filepath => "/home/ubuntu/sql/user.sql"
      last_run_metadata_path => "/home/ubuntu/log/last_user.txt"
      clean_run => false
      schedule => "* * * * *"
    }
}

filter {   ### 过滤器
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {   ### 根据输入源中的type进行输出源的匹配
    if [type] == "memories" {
        elasticsearch {
            hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
            index => "memories"   ### 索引名
            document_id => "%{mem_id}"   ### 文档ID,尽量保证其唯一性,否则可能同步失败
            template_overwrite => true   ### 是否使用ES模板
            template => "/home/ubuntu/template/memories_new.json"  ### ES模板位置
            template_name => "memories"   ### ES模板名,不要有重复
        }
    }
    if [type] == "user" {
        elasticsearch {
            hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
            index => "user"
            document_id => "%{user_id}"
            template_overwrite => true
            template => "/home/ubuntu/template/user_new.json"
            template_name => "user"
        }
    }
    stdout {   ### 将输入信息使用JSON格式输出到控制台
        codec => json_lines
    }
}

代码中的注释需要删掉,否则会不成功

  • 全量同步时,SQL语句可以写为:
SELECT id mem_id, id, title, banner, user_id userId, UNIX_TIMESTAMP(create_time) createTime, UNIX_TIMESTAMP(update_time) updateTime, open_status openStatus, status, platform FROM memories
  • 增量同步时,SQL语句可以写为:
SELECT id mem_id, id, title, banner, user_id userId, UNIX_TIMESTAMP(create_time) createTime, UNIX_TIMESTAMP(update_time) updateTime, open_status openStatus, status, platform FROM memories WHERE update_time >= :sql_last_value OR create_time >= :sql_last_value ORDER BY create_time asc, update_time ASC

tip:单独的mem_id用作ES中的文档ID,因为用ID可能会和其他的ID重复,导致同步失败。

  • ES模板如下:
{
        "index_patterns": "memories",  ### 新版本大概7.x以后的该字段为index_patterns,之前为index,注释需删除
        "version": 7040299,
        "settings": {
                "number_of_shards": 1,
                "number_of_replicas": 1,
                "index.refresh_interval": "5s"
        },
        "mappings": {
                "properties": {
                        "banner": {
                                "type": "text",
                                "fields": {
                                        "keyword": {
                                                "type": "keyword",
                                                "ignore_above": 256
                                        }
                                }
                        },
                        "bannerUrl": {
                                "type": "text",
                                "fields": {
                                        "keyword": {
                                                "type": "keyword",
                                                "ignore_above": 256
                                        }
                                }
                        },
                        "createTime": {
                                "type": "long"
                        },
                        "id": {
                                "type": "long"
                        },
                        "m_id": {
                                "type": "long"
                        },
                        "openStatus": {
                                "type": "long"
                        },
                        "platform": {
                                "type": "long"
                        },
                        "status": {
                                "type": "long"
                        },
                        "title": {
                                "type": "text",
                                "analyzer":"ik_max_word",    ### 配置ik分词器,注释需删除
                                "search_analyzer":"ik_smart"
                        },
                        "updateTime": {
                                "type": "long"
                        },
                        "userId": {
                                "type": "long"
                        }
                }
        }
}
  • 模板是否正确可以使用ES的Rest接口或Kibana添加一下,命令为:
POST _template/memories
{
    ###模板内容
}

如果有错会报错误信息,模板错误也是导致数据同步失败的原因之一。

4、运行logstash

root@ubuntu:/home/ubuntu#./logstash/bin/logstash -f ./logstash.conf
ElasticsearchLogstash