透過 logstash-input-jdbc 讀入 mysql 資料至 ELK

這陣子有個專案要需要將在 mysql 上的資料讀入 ELK 並供前端用戶透過定義的 dashboard 來撈取相關的資料或統計,因為之前用了 logstash 很久,還沒有用過 input-jdbc 的 plugin 所以下方摘要記錄一下,以方便未來有需要來查

首先假設環境中已裝好 logstash ,也要記得安裝所需要的 plugin 這次主角是logstash-input-jdbc
所以就透過 logstash-plugin install logstash-input-jdbc 安裝

# logstash-plugin install logstash-input-jdbc

安裝好後別忘了下載或安裝 jdbc mysql connector

我是用 5.x 的,所以來這邊下載 https://dev.mysql.com/downloads/connector/j/5.1.html

下載後解壓並將 mysql-connector-java-5.1.47-bin.jar 複製至對應路徑下等等 logstash 中要指過去.

設置 my-logstash-input-jdbc.conf 於 /etc/logstash/conf.d 下重啟 logstash服務

input {
  jdbc {
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_driver_library => "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.201.b09-2.el7_6.x86_64/lib/mysql-connector-java-5.1.47-bin.jar"
    jdbc_connection_string => "jdbc:mysql://[MySQL_IP_Address]:3306/[DataBase_Name]"
    jdbc_user => "[MySQL_User_Name]"
    jdbc_password => "[MySQL_User_Password]"
    schedule => "*/1 * * * *"  # 每分鐘執行一次
    statement => "select a.id as id,a.chatroom_id,a.profile_id,a.messageable_type,a.messageable_id,a.reply_token,a.sent,a.created_at,a.updated_at,b.message_id,b.type,b.text,c.line_user_id,c.display_name,c.picture_url,c.status_message from messages as a left outer join message_texts as b on a.messageable_id=b.id left outer join profiles as c on a.profile_id=c.id where a.id > :sql_last_value;"


    type => "jdbc"

    # 是否要記錄上次執行的結尾, 如果設置 true 則會參考 tracking_column 所指定的欄位及值記錄留存下,並存放至 last_run_metadata_path 位置的檔案中。
    record_last_run => "true"

    # 是否要自定義擷取 column 的值,如果 record_last_run 為 true ,可以自定 track 的 column 名稱,否則記錄的是 timestamp .
    use_column_value => "true"

    # 如果 use_column_value 為 true ,此欄位必須是漸增(Auto Increment). 一般是mysql Primary Key
    tracking_column => "id"

    # 記錄位置的路徑, 若是要重置讀取位置,建議要清空這個檔案及刪除舊索引
    last_run_metadata_path => "/etc/logstash/conf.d/last_id"

    # 是否清除 last_run_metadata_path 的記錄,如果為 true 表示每次都是自頭開始查詢所有值
    clean_run => "false"

    #是否將欄位名 (column) 名稱轉成小寫
    lowercase_column_names => "false"
  }
}

filter {
    
  date {
       match => ["created_at", "MMMM d'th' yyyy, HH:mm:ss.SSS", "MMMM dd'th' yyyy, HH:mm:ss.SSS","MMMM d'st' yyyy, HH:mm:ss.SSS","MMMM dd'st' yyyy, HH:mm:ss.SSS", "MMMM d'nd' yyyy, HH:mm:ss.SSS", "MMMM dd'nd' yyyy, HH:mm:ss.SSS", "MMMM d'rd' yyyy, HH:mm:ss.SSS", "MMMM dd'rd' yyyy, HH:mm:ss.SSS", "yyyy-MM-dd HH:mm:ss,SSS","ISO8601"]
       #timezone => "Asia/Taipei"
       timezone => "UTC"
       target => "@timestamp"
   }

}


output {
    # 输出到elasticsearch的配置
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        user => "[logstash_user_name]"
        password => "[logstash_user_password]"
        index => "[索引名稱_Prefix]-%{+YYYY.MM.dd}"

        document_id => "%{id}"
        template_overwrite => true
    }

    stdout {
        codec => json_lines
    }
}

若是一切正常應就可以看到 logstash 有持續每分鐘去讀取大於上次讀取的結尾的記錄,也可以觀察 last_id 的檔案看看目前讀取的結尾記錄

# journalctl -xe -f -l --unit=logstash
....
Apr 03 14:00:00 12042l-ELK.etzone.net logstash[42582]: [2019-04-03T14:00:00,236][INFO ][logstash.inputs.jdbc ] (0.001811s) select a.id as id,a.chatroom_id,a.profile_id,a.messageable_type,a.messageable_id,a.reply_token,a.sent,a.created_at,a.updated_at,b.message_id,b.type,b.text,c.line_user_id,c.display_name,c.picture_url,c.status_message from messages as a left outer join message_texts as b on a.messageable_id=b.id left outer join profiles as c on a.profile_id=c.id where a.id > 2316563;
Apr 03 14:01:00 12042l-ELK.etzone.net logstash[42582]: [2019-04-03T14:01:00,281][INFO ][logstash.inputs.jdbc ] (0.002146s) select a.id as id,a.chatroom_id,a.profile_id,a.messageable_type,a.messageable_id,a.reply_token,a.sent,a.created_at,a.updated_at,b.message_id,b.type,b.text,c.line_user_id,c.display_name,c.picture_url,c.status_message from messages as a left outer join message_texts as b on a.messageable_id=b.id left outer join profiles as c on a.profile_id=c.id where a.id > 2316591;
....

 

[jerry@12042l-ELK ~]$ cat /etc/logstash/conf.d/last_id 
--- 2316591
[jerry@12042l-ELK ~]$

Author: jerryw1974

learning and focus on computer science, cloud infrastructure, virtualization and information security, technical, networking,platform system and cyber-security related topic.