시작하며

Logstash는 다양한 경로에서 데이터를 수집하고 처리한다. 수집하는 데이터의 형태는 정말 다양하며, 때로는 복잡한 전처리를 요구하기도 한다. 이번 포스팅에서는 JSON 객체 안에 배열 형태로 들어오는 JSON 객체를 파싱하는 방법을 기록으로 남긴다.

JSON 객체 배열 파싱 with Ruby 필터

JSON 데이터 형태

이번 포스팅에서 가정할 Input 데이터는 아래와 같다. JSON 객체 내부에 배열 형태의 JSON 객체들을 가지는 형태로, 배열 내 각 객체 내부의 필드값을 파싱하여 처리할 일들이 가끔 생긴다. 이런 경우에는 ruby filter를 사용하면 처리를 간편하게 할 수 있다.

{
 
    ...
 
  "wanted_to_be_parsed" : {
    "field1" : "value1",
    "field2" : "value2",
    "field3" : [
      {
        "InnerField1" : "value3",
        "InnerField2" : "value4",
      },
      {
        "InnerField1" : "value3",
        "InnerField2" : "value4"
      },
 
      ...
 
    ]
 
  }
}

Ruby 코드로 파싱하기

위의 예시 JSON Object를 파싱해 보겠다. (지금 작성하는 부분은 Logstash 전체 .conf 파일의 filter 내부이다.)

우선, 기본적으로 json filter를 통해 파싱하려는 대상 필드를 target 필드로 이동시킨다. 이때 target field는 json_parsed로 가정한다.

json {
    source => "wanted_to_be_parsed"
    target => "json_parsed"
}

이제 ruby filter를 통해 한 번 파싱된 json 객체의 특정 필드 배열을 집계한다. Python과 C++을 섞은 듯한 문법으로 간단하게 반복문 처리 및 집계 처리가 가능하다.

  ruby {
      code => '
          field1ValueList = []
          field2ValueList = []
 
          event.get("[wanted_to_parse][field3]").each do |each_json_obj|
              field1ValueList << each_json_obj["InnerField1"]
              field2ValueList << each_json_obj["InnerField2"]
          end
 
          event.set("elasticsearch_index_field1", field1ValueList.join(","))
          event.set("elasticsearch_index_field2", field2ValueList.join(","))
      '
  }

전체 Logstash config 파일 형태

Logstash는 기본적으로 input, filter, output 의 구조로 되어 있으므로, 위의 예제를 Kafka에서 Elasticsearch로 보내는 파이프라인이라고 가정하면 아래와 같은 구조가 전체 .conf 파일의 모습이 된다. 이는 조금 특수한 상황에서의 JSON 객체 처리를 다룬 형태이고, ruby와 함께 여러 필터들을 조합하면 복잡한 전처리도 어렵지 않게 처리할 수 있다.

input{
    kafka{
        bootstrap_servers => ""
        topics => []
        auto_offset_reset =>
        group_id =>
        client_id =>
        decorate_events =>
    }
} 
-> 
filter{
    mutate{
        add_field => {
            "filed1" => "%{[@metadata][kafka][topic]}"
            ...
            "fieldN" => ""
        }
    }
    ...
    dissect {
        mapping => {
            "message" => "%{field1}{delimiter}%{field2}{delimiter}%{field3} ... %{fieldN}"
        }
    }
    ...
    json {
        source => "wanted_to_be_parsed"
        target => "json_parsed"
    }
    ...
    if [특정필드] == '특정값' {
        ruby {
            code => '
                field1ValueList = []
                field2ValueList = []
 
                event.get("[wanted_to_parse][field3]").each do |each_json_obj|
                    field1ValueList << each_json_obj["InnerField1"]
                    field2ValueList << each_json_obj["InnerField2"]
                end
 
                event.set("elasticsearch_index_field1", field1ValueList.join(","))
                event.set("elasticsearch_index_field2", field2ValueList.join(","))
            '
        }
    }
    ...
 
    mutate {
        remove_field => [
            "json_parsed"
        ]
    }
} 
-> 
output{
    elasticsearch {
        hosts => ["http://hostip:postport", ...]
        index => 
        document_id =>
        user => "${elasticsearch_username}"
        password => "${elasticsearch_passowrd}"
    }
}

정리하며

Logstash에서 JSON 배열 형태의 중첩 객체를 파싱할 때는 json filter로 먼저 대상 필드를 이동시킨 후, ruby filter를 활용해 배열을 순회하며 원하는 필드를 추출하는 방식이 효과적이다. ruby와 여러 필터를 조합하면 복잡한 전처리 로직도 단일 Logstash 파이프라인 안에서 처리할 수 있다.