0%

Filebeat-Logstash-Elasticsearch解析CSV

Logstash能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用Grok从非结构化数据中派生出结构,从IP地址解码出地理坐标,匿名化或排除敏感字段,并简化整体处理过程。

一、准备

  1. 分别下载安装相同版本的Elasticsearch、Logstash、Filebeat
  2. 准备测试数据movies.csv传送门,删除一部分,cat movies.csv
    • 最终目录/Users/your_path/elk/ml-25m/movies.csv
1
2
3
4
movieId,title,genres
38,It Takes Two (1995),Children|Comedy
39,Clueless (1995),Comedy|Romance
40,"Cry, the Beloved Country (1995)",Drama

二、实战

  1. 启动Elasticsearch实例./bin/elasticsearch,默认配置config/elasticsearch.yml
1
2
3
4
5
6
7
8
9
10
11
12
#cluster.name: my-application
#node.name: node-1
#node.attr.rack: r1
#path.data: /path/to/data
#path.logs: /path/to/logs
#bootstrap.memory_lock: true
#network.host: 192.168.0.1
#http.port: 9200
#discovery.seed_hosts: ["host1", "host2"]
#action.destructive_requires_name: true
http.cors.enabled: true
http.cors.allow-origin: "*"
  1. 启动Logstash实例./bin/logstash -f config/logstash.conflogstash.conf内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
beats {
port => 9011
}
#file {
# path => ["/Users/your_path/elk/ml-25m/movies.csv"]
# start_position => "beginning"
# sincedb_path => null
#}
}

filter {
csv {
separator => ","
columns => ["movieId","title","genre"]
}
mutate {
split => { "genre" => "|" }
# remove_field => ["path", "host","@timestamp","message"]
}
mutate {
convert => {
"year" => "integer"
}
strip => ["title"]
#remove_field => ["path", "host","@timestamp","message","content"]
}
}

output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "movies"
}
stdout {}
}
  1. 启动Filebeat实例./filebeat -e -c movie.yml,配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
filebeat.inputs:
- type: log
enabled: true
paths:
- /Users/your_path/elk/ml-25m/movies.csv

setup.template.settings:
index.number_of_shards: 1

output.logstash:
hosts: ["localhost:9011"]

#output.elasticsearch:
#hosts: ["localhost:9200"]
#protocol: "http"
#index: "filebeat-movies-%{+yyyy.MM.dd}"

setup.ilm.enabled: false
setup.template.name: "filebeat-movies"
setup.template.pattern: "filebeat-movies-*"
  1. 查询

    • curl -XGET "localhost:9200/movies/_search?pretty" -H "content-type:application/json" -d '{"_source":["movieId","title"],"query":{"match":{"title":"Beloved*"}}}'
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    {
    "took" : 107,
    "timed_out" : false,
    "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
    },
    "hits" : {
    "total" : {
    "value" : 1,
    "relation" : "eq"
    },
    "max_score" : 0.94597876,
    "hits" : [
    {
    "_index" : "movies",
    "_type" : "_doc",
    "_id" : "On05CXkBxPLXGnNxw4p0",
    "_score" : 0.94597876,
    "_source" : {
    "movieId" : "40",
    "title" : "Cry, the Beloved Country (1995)"
    }
    }
    ]
    }
    }
  2. 追加数据

    • echo 111,liusir,haha >> movies.csv
    • echo 222,cry,hehe >> movies.csv
    • echo 222,take,hehe >> movies.csv
    • echo 222,takes off,heihei >> movies.csv
  3. 查询title包含takes的文档

    • curl -XGET "localhost:9200/_search?pretty" -H "content-type:application/json" -d '{"_source":["movieId","title"],"query":{"match":{"title":"takes*"}}}'
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    {
    "took" : 489,
    "timed_out" : false,
    "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
    },
    "hits" : {
    "total" : {
    "value" : 2,
    "relation" : "eq"
    },
    "max_score" : 1.3125186,
    "hits" : [
    {
    "_index" : "movies",
    "_type" : "_doc",
    "_id" : "QH3nC3kBxPLXGnNxIYo-",
    "_score" : 1.3125186,
    "_source" : {
    "movieId" : "222",
    "title" : "takes off"
    }
    },
    {
    "_index" : "movies",
    "_type" : "_doc",
    "_id" : "OX05CXkBxPLXGnNxw4pz",
    "_score" : 0.9411969,
    "_source" : {
    "movieId" : "38",
    "title" : "It Takes Two (1995)"
    }
    }
    ]
    }
    }

三、参考

  1. 参考一
  2. 参考二
  3. 参考三
  4. 参考四
  5. 参考五