logstash와 kafka 연동시 Multiple Topic 사용하기

ELK 를 구축할때 kafka 로 프로세스별 log 데이터를 밀어넣은 다음 kafka - logstash 를 연동하여 ElasticSearch로 보내도록 구현중이다.

logstash 에서 여러개의 kafka Topic 을 처리하는 방법에 대해서 정리한 내용이다

A123 process => topic : A1

B123 process => topic : A2

C123 process => topic : A3

이런식으로 3개의 프로세스의 로그가 각각 다른 토픽에 저장되어있다.

ES(ElasticSearch)에는 다음과 같이 저장하려고 한다

1
2
3
4
5
index : server-log-[@date]
- type
- A1-log
- A2-log
- A3-log

logstash 에서 여러개의 topic 을 불러오는건 다음과 같다

1
2
3
4
5
6
7
8
input {
kafka {
bootstrap_servers => "192.168.202.148:9092"
topics => ["topic1","topic2"]
group_id => "logstash"
consumer_threads => 2
decorate_events => true
}

위 topic 부분에 리스트([]) 형식으로 불러올 topic 을 넣어주면 된다.

문제는 topic 들을 각각 다른 ES type으로 분류하여 저장하는것이다.

topic 이름으로 구분하여 ES에 저장할때 사용되는 type 을 지정하고 싶은데 topic 을 구분할 수 있는 방법이 topic 명 말고는 없다.

그래서 logstash config 파일에 output 부분에서 topic 명으로 분기를 만들어 주기로 했는데…

Removal of mapping types >> multiple type 불가

그렇다.. ES 6 버전부터는 index 당 한개의 type 만 생성이 가능하다.

그전에 input - kafka 안쪽에 “decorate_events => true” 설정을 꼭 넣어주자 그래야 topic 을 불러올수가 있다.

type을 여러개 만들면 이와같은 오류가 발생한다.

1
Rejecting mapping update to [index1-2018.06.20] as the final mapping would have more than 1 type

공식 문서에는 document_type을 생성하여 기존 type을 대체하라고 했는데 나는 그냥 원본 데이터에 새로 pName 필드를 추가하여 구분하는 방식으로 해보았다

최종 왼성된 config 설정은 아래와 같다

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
input {
kafka {
bootstrap_servers => "192.168.202.148:9092"
topics => ["topic1","topic2"]
group_id => "logstash"
consumer_threads => 2
decorate_events => true
}
}

filter {
if [@metadata][kafka][topic] == "topic1" {
mutate {
add_field => {'pName' => 'topic1'}
}
}
if [@metadata][kafka][topic] == "topic2" {
mutate {
add_field => {'pName' => 'topic2'}
}
}
json {
source => "message"
}
}

output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "index-%{+YYYY.MM.dd}"
document_type => "logs"
}
}

if [@metadata][kafka][topic] 를 이용하여 topic 을 불러와 새로운 필드를 추가하는 방식으로 프로세스별 로그를 구분하도록 하였다.

공유하기