logstash를 이용하여 source 데이터 가공하기

이전 글에서 ES에 대한 소개와 ELK 설치에 대해 알아보았다.

본글에서는 source data를 가공하고 logstash 를 이용하여 ES에 저장하는 방법에 대해 알아보고자 한다.

Source Data 편집

1편에서 언급했던 교통카드 이용 정보는 다음과 같이 총 9개의 필드가 있다.

사용일 카드번호 승차역(입구) 승차시간 하차역(출구) 하차시간 사용금액 구분 결제예정일자

내가 필요한 필드는 ‘사용일’, ‘승차역’, ‘승차시간’, ‘하차역’, ‘하차시간’,’사용금액’, ‘구분’ 정도라서 해당 필드만 선택하여 새로운 엑셀자료를 만들어 주었다.

그리고 한글 필드명은 여러모로 불편하므로 필드명도 영문명으로 바꿔주었다.

새로 만든 엑셀파일을 csv 형태로 저장해주면 된다.

Logstash를 이용해 ES에 데이터 저장하기

위 수정된 source data를 보면 Boarding_Time(승차시간) , Exit_Time(하차시간) 필드의 데이터가 HH:mm:ss 형태로 되어있어 년,월,일을 알 수가 없다.

그래서 date 필드의 날짜 정보와 합쳐 “yyyy.MM.dd HH:mm:ss” 형태의 date field 를 생성하고 ES에 저장할때 Date type으로 인식하도록 해주는 작업이 필요하다.

이 작업들은 logstash 설정파일을 통해 가능한대

  • 입력
  • 가공
  • 출력

총 3개의 필드로 나뉘어져 있다. 하나씩 알아가 보자.

입력

먼저 csv 형태로 만든 파일을 읽어오는 부분이다.

1
2
3
4
5
6
7
input {
file {
path => "/Users/abc/Downloads/bus.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}

input 부분에 파일형태의 데이터를 입력받는 file플러그인을 사용하였다.

가공(filter)

filter 영역은 활용성이 정말 무긍무진 하다. 새로운 필드를 추가할 수도 있고 제거할 수도 있고 기존 데이터의 형식을 바꾸는등 다양한 활용방법이 있는데 내가 필요한 기능은 위에서 언급했듯이 Boarding_Time(승차시간) , Exit_Time(하차시간) 필드의 데이터 형식을 바꾸고 date type으로 변환 하는것이다.

먼저 해주어야 할 것은 csv 파일의 header 정보를 제공해주고 separator를 설정해주는것이다.

1
2
3
4
csv {
separator => ","
columns => ["date","Boarding_Station","Boarding_Time","Exit_Station","Exit_Time","charge ","classify"]
}

시간정보 변경

기존 승차시간 => 07:51:07

변환된 승차시간 => 2018.03.02 07:51:07

위 작업은 mutate plugin을 이용한다. 기존 date, Boarding_Time, Exit_Time 필드를 이용해 새로운 필드인 Boarding_clock, Exit_clock 필드를 생성해 주고 기존 Boarding_Time, Exit_Time 두 필드는 삭제를 해주었다. date 필드는 검색에 사용될거라 남겨뒀다.

1
2
3
4
5
6
7
mutate {
add_field => {
"Boarding_clock" => "%{date} %{Boarding_Time}"
"Exit_clock" => "%{date} %{Exit_Time}"
}
remove_field => ["Exit_Time", "Boarding_Time"]
}

그리고 새로 생성한 두개의 필드와 date 필드의 type을 Date 형식으로 변환 해주어야 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
date { 
match => ["Boarding_clock", "yyyy.MM.dd HH:mm:ss"]
target => "Boarding_clock"
}

date {
match => ["Exit_clock", "yyyy.MM.dd HH:mm:ss"]
target => "Exit_clock"
}

date {
match => ["date", "yyyy.MM.dd"]
target => "date"
}

마지막으로 @timestamp 필드 정보를 Boarding_clock 필드로 치환한다.

1
2
3
date { 
match => ["Boarding_clock", "yyyy.MM.dd HH:mm:ss"]
}

출력

마지막으로 입력받은 데이터를 출력하는 부분이다. stdout 형태로 출력하여 수정된 데이터 형태를 보며 수정할 수 있고 수정작업이 끝나면 바로 ES로 데이터를 보낼 수 있다.

1
2
3
4
5
6
7
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "office_work"
}
stdout {}
}

다음 글에서는 kibana 를 이용해서 시각화 하는 방법에 대해 알아보자.

공유하기