메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


* 출처 : http://kysepark.blogspot.kr/2016/03/how-to-build-complex-event-processing.html


본 글은 클라우데라 블로그의 글을 정리한 내용이다. 원문은 아래 링크를 따라가면 볼 수 있다.
대략적인 블로그 글의 내용은 CDH를 business execution engine과 결합하여 CEP(Complex Event Processing)로서 사용할 수 있다는 내용으로 이를 무엇을 사용해서 어떻게 구현했다는 내용이다.


  1. CEP에 대해서
    1. Event Processing이란?
      1. 데이터 스트림을 추적 및 분석하여 좀 더 나은 insight와 결정을 할 수 있도록 해주는 것을 의미
    2. CEP란?
      1. Event Processing의 일종으로 여러 소스로부터 얻어낸 데이터를 결합하여 여러 이벤트들간의 패턴 및 복잡한 관계를 찾아내는 것을 의미
      2. CEP는 여러 데이터 소스들로부터 기회와 위협을 확인하고 이를 실시간으로 경고를 할 수 있도록 해준다.
      3. 오늘날 CEP는 매우 다양한 분야에서 사용되고 있다.
        1. 금융: 거래 분석, 금융사기 탐지
        2. 항공: 운항 모니터링
        3. 의료: 클레임 프로세싱, 환자 모니터링
        4. 에너와 통신: 정전 탐지
    3. 빅데이터
      1. 기하급수적인 데이터의 증가로 빅데이터 처리 필요함.
      2. CDH를 사용하여 이를 해결할 수 있다.
  2. Architecture and Design
    1. CEP를 만들기 위해 CDH를 사용한 architecture는 다음과 같다.
    2. 위 그림과 같은 architecture에서 사용된 컴포넌트들은 다음과 같다.
      1. Ingest: 이벤트 수집을 위해  Apache Kafka or Apache Flume 사용.
      2. Storage: 수집한 이벤트 저장에는 Apache HBase(또는 미래에는 아마 Kudu)를 사용.
      3. Alerting: Apache Kafka 또는 다른 직접적인 API를 통합하여 경고를 할 수 있게 함.
      4. Stream processing
        1. spark streaming을 사용하여 event processing을 처리함. 
        2. Processing은 마이크로 배치로 처리되며 다음과 같은 일을 한다.
          1. Parsing
          2. Lookup
          3. Persistence
          4. Building of current state from a series of historical events
          5. Custom processing logic
        3. 일례로 다양한 sliding-window위의 여러 Spark RDD stream을 join하여 실시간에 가깝게 insight와 trend를 얻을 수 있다.
        4. 이 배치 작업은 매번 수초 간격으로 진행되며 수초보다 적은 end-to-end latency가 나게 한다.
      5. Business process management
        1. Rules framework는 technical 또는 non-technical한 사용자들도 복잡한 business logic을 디자인할 수 있게 해준다.
        2. 이 글에서는 Apahce spark와 Drools를 함께 사용하여 business의 요구사항에 대해 평가해본다.
      6. Metrics: OpenTSDB와 같은 time-series 데이터베이스의 대쉬보드를 통해 metrics를 제공한다. 또한 Cloudera Search + HUE를 사용해 같은 기능을 사용할 수 있다.
    3. 예제로 sepsis-shock criteria(패혈증 쇼크 기준)을 Drools를 사용하여 동작시켜보자. 해당 조건들은 이곳에 언급된 내용들을 다룬다.

      1. 위 그림에서 볼 수 있듯이 24시간내에 환자의 두 개의 vital이 범위를 넘어갈 경우 SIRS 기준에 부합하게 된다. 
      2. 이 vital들은 다른 시각에 기준에 도달하게 된다. 따라서 매시간 환자의 상태를 재구성하기 위해 HBase를 사용하여 매시간 vital을 읽고 rules를 적용할 수 있게 한다.
      3. 예제를 위해 모든 event마다 모든 환자들의 vital을 가져온다고 가정하자. 그리고 snapshot/state-building step은 스킵한다.
      4. 환자가 SIRS 기준에 부합하게 되면 즉시 sepsis, severe sepsis, septic shock등을 순서대로 체크해야한다. 이 평가 프로세스의 flow chart는 다음과 같다.
      5. 모든 조건들을 찾아내고 업무자들에게 친숙하게 데모를 만들기 위해 Drools decision tables를 사용한다. 이러한 접근은 business logic을 Java/Scala code 또는 custom syntax로 가지는 것보다 좀 더 많은 관중(business analyst 포함)들에게 가시성을 제공한다.
      6. 다음은 sepsis calculator를 충족시키기 위해 만들어진 decision table이다.
      7. 위 그림에 대하여
        1. 연한 빨강색으로 채워진 모든 셀들은 code로 돌아가도록 링크되어 있다.
        2. 오렌지색으로 채워진 모든 셀들은 rule이 성공적으로 평가된 이후에 정해진 값을 가진다.
        3. 녹색으로 채워진 모든 셀들은 주어진 rule을 만족시키기 위해 들어오는 데이터들의 범위들과 값들을 가진다.
        4. 모든 파란색으로 채워진 모든 셀들은 rule들의 이름과 그 조건들이다.
      8. 다음은 Spark와 Drools의 통합의 몇 가지 목표들이다.
        1. Rule들의 실행을 Spark/Streaming으로부터 seamless하게 만든다.
        2. 단순성을 위해 rule 엔진의 stateless 부분을 사용하라. 일정한 시간 간격을 유지하며 상태를 저장하는 spark의 sliding window를 사용할 수 있다.
        3. Rule을 요구사항에 기반하여 순차적 또는 무작위로 실행하라.
        4. 몇몇 metrics를 계산하기 위해 rule 실행 결과를 spark dataframe에 넣어라.
  3. Coding
    1. 지금부터 위 목표를 달성하기 위한 절차와 코드 스니펫을 보여주겠다. 모든 코드는 https://github.com/mganta/sprue 에서 다운로드 받을 수 있다.
      1. 각 파티션에 session factory를 한번만 초기화한다. 그리고 모든 다음 dstream 실행에서 재사용한다.

        KieContainer kContainer = kieServices.newKieContainer(kieRepository.getDefaultReleaseId());
        kContainer.newStatelessKieSession();


      2. 들어오는 데이터를 HBase에 저장한다.
        //store incoming data in hbase
        hbaseContext.streamBulkPut[Patient](patientStream, patientTable,  HBaseUtil.insertIncomingDataIntoHBase, true)
      3. RDD의 각 이벤트마다 모든 rule들을 실행하고 평가 결과를 RDD와 함께 리턴한다.
        //evaluate all the rules for the batch
            patientStream.foreachRDD(rdd => {
             val evaluatedPatients = rdd.mapPartitions(partitionOfRecords => {
                val ksession = KieSessionFactory.getKieSession(xlsFileName)
                val patients = partitionOfRecords.map(patient => {
                  ksession.execute(patient)
                  patient
                })
                patients
              })
      4. RDD를 dataframe으로 변환하고 몇몇 metrics을 계산한다.
        //convert to dataframe
                val patientdf = sqc.applySchema(evaluatedPatients, classOf[Patient])
                //compute statistics
                val countMatrix = patientdf.groupBy("location").agg(max("evaluationDate"), sum("sirsFlag"), sum("sepsisFlag"), sum("severeSepsisFlag"), sum("septicShockFlag"), sum("organDysfunctionSyndrome"))
               countMatrix.show()
      5. HBase에 업데이트를 저장한다.
        hbaseContext.streamBulkPut[Patient](patientStream, patientTable,  HBaseUtil.insertEvaluatedDataIntoHBase, true
      6. Time-series rest api를 호출하고 마이크로 배치 metrics를 게시한다. Time-series 대쉬보드는 이 데이터를 읽을 수 있다.(여기서  OpenTSDB를 어떻게 설치하는 지를 배울 수 있다.)
        //opentsdb update statistics
        countMatrix.foreach(row => {
        TSDBUpdater.loadPatientStats(row.getString(0), row.getLong(1), row.getLong(2), row.getLong(3), row.getLong(4), row.getLong(5), row.getLong(6))
          })
      7. 위의 모든 절차들은 여기 spark driver code와 링크되어 있다.
      8. 이 예제는 무작위로 선출된 환자의 데이터 스트림 생성하기 위해 QueueStream을 사용한다. 실제 시나리오에서는 각 이벤트에서 hl7 메시지를 받게 된다.
      9. 예제를 실행하면 들어오는 각 이벤트에서 rule이 실해되는 걸 볼 수 있다. 그리고 각 상태의 그룹화된 metrics을 볼 수 있다. 이는 아래와 같다.
        Total Patients in batch: 212
        Patients with atleast one condition: 137
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |location|MAX(evaluationDate)|SUM(sirsFlag)|SUM(sepsisFlag)|SUM(severeSepsisFlag)|SUM(septicShockFlag)|SUM(organDysfunctionSyndrome)|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |      MS|      1443198199233|            3|              2|                    2|                   0|                            0|
        |      NE|      1443198199233|            8|              4|                    4|                   1|                            0|
        |      TX|      1443198199233|           10|              8|                    8|                   7|                            1|
        |      NM|      1443198199232|            8|              6|                    6|                   3|                            0|
        |      NY|      1443198199233|            6|              4|                    3|                   3|                            0|
        |      OK|      1443198199233|            7|              5|                    3|                   1|                            0|
        |      VA|      1443198199232|           16|             14|                   12|                   7|                            1|
        |      IL|      1443198199233|            5|              3|                    3|                   1|                            0|
        |      CA|      1443198199233|            7|              6|                    6|                   4|                            0|
        |      KS|      1443198199233|           12|              8|                    8|                   6|                            0|
        |      LA|      1443198199233|           14|              8|                    7|                   2|                            1|
        |      SC|      1443198199233|           12|              9|                    6|                   4|                            0|
        |      FL|      1443198199233|            7|              4|                    4|                   2|                            0|
        |      MN|      1443198199233|            8|              5|                    5|                   2|                            0|
        |      GA|      1443198199233|           14|             12|                   12|                   6|                            0|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        Total Patients in batch: 247
        Patients with atleast one condition: 170
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |location|MAX(evaluationDate)|SUM(sirsFlag)|SUM(sepsisFlag)|SUM(severeSepsisFlag)|SUM(septicShockFlag)|SUM(organDysfunctionSyndrome)|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |      MS|      1443198199242|            1|              1|                    1|                   1|                            0|
        |      NE|      1443198199241|           14|             11|                   11|                   5|                            1|
        |      TX|      1443198199242|           13|             10|                    9|                   6|                            0|
        |      NM|      1443198199241|           11|              4|                    3|                   2|                            0|
        |      NY|      1443198199241|           13|              7|                    6|                   4|                            0|
        |      OK|      1443198199241|            7|              5|                    5|                   3|                            0|
        |      VA|      1443198199242|           10|              6|                    6|                   2|                            0|
        |      IL|      1443198199241|           17|             12|                   12|                   5|                            1|
        |      CA|      1443198199241|           17|              8|                    7|                   2|                            0|
        |      KS|      1443198199242|           12|              8|                    7|                   5|                            2|
        |      LA|      1443198199242|           15|             11|                    9|                   4|                            2|
        |      SC|      1443198199241|           13|             10|                    9|                   5|                            0|
        |      FL|      1443198199241|           10|              5|                    5|                   1|                            0|
        |      MN|      1443198199241|           10|              6|                    6|                   3|                            0|
        |      GA|      1443198199241|            7|              2|                    1|                   1|                            0|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
      10.  또는 OpenTSDB를 set up하면 다음과 같은 그림을 볼 수 있다.
  4. 결론
    1. 복잡한 시스템을 설계할 때에는 rule 엔진을 사용하는게 유익한 선택이다.(Logic과 데이터의 분리는 domain expert에게 logic 결정에 대한 insight를 주면서 유연한 시스템을 낳는다.)
    2. 지금까지 본 바와 같이 CDH(for Spark, HBase, Kafka)를 rule엔진과 결합하여 사용하는게 복잡한 business logic을 평가하고 실시간으로 이를 동작시키는데 도움이 되는 걸 알 수 있다.
번호 제목 날짜 조회 수
321 .gitignore파일에 지정되지 않은 파일이 ignore되는 경우 확인방법 2016.11.22 852
320 github에 있는 프로젝트와 로컬에서 작업한 프로젝트 합치기 2016.11.22 859
319 특정 커밋 시점(commit id를 기준으로)으로 돌리기(reset) 2016.11.21 608
318 Github를 이용하는 전체 흐름 이해하기 2016.11.18 269
317 특정 단계의 commit상태로 만들기(이렇게 하면 중간에 반영된 모든 commit를 history가 삭제된다) 2016.11.17 878
316 git 초기화(Windows에서 Git Bash사용) 2016.11.17 1103
315 spark notebook 0.7.0설치및 설정 2016.11.14 753
314 참고할만한 spark예제를 설명하는 사이트 2016.11.11 528
313 Kafka Offset Monitor로 kafka 상태 모니터링 하기 file 2016.11.08 1464
312 Eclipse실행시 Java was started but returned exit code=1이라는 오류가 발생할때 조치방법 2016.11.07 862
311 [SparkR]SparkR 설치 사용기 1 - Installation Guide On Yarn Cluster & Mesos Cluster & Stand Alone Cluster file 2016.11.04 551
310 데이타 분석및 머신러닝에 도움이 도움이 되는 사이트 2016.11.04 764
309 java스레드 덤프 분석하기 file 2016.11.03 248
308 centos 6에서 mariadb 5.1 to 10.0 으로 upgrade 2016.11.01 265
307 Spark Streaming 코드레벨단에서의 성능개선 2016.10.31 810
306 Flume과 Kafka를 사용한 초당 100만개 로그 수집 테스트 file 2016.10.31 1199
305 Flume을 이용한 데이타 수집시 HBase write 성능 튜닝 file 2016.10.31 806
» How-to: Build a Complex Event Processing App on Apache Spark and Drools file 2016.10.31 546
303 How-to: Tune Your Apache Spark Jobs (Part 2) file 2016.10.31 457
302 mybatis와 spring을 org.apache.commons.dbcp2.BasicDataSource의 DataSource로 연동할때 DB설정(참고) 2016.10.31 1145
위로