메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


* 출처 : http://kysepark.blogspot.kr/2016/03/how-to-build-complex-event-processing.html


본 글은 클라우데라 블로그의 글을 정리한 내용이다. 원문은 아래 링크를 따라가면 볼 수 있다.
대략적인 블로그 글의 내용은 CDH를 business execution engine과 결합하여 CEP(Complex Event Processing)로서 사용할 수 있다는 내용으로 이를 무엇을 사용해서 어떻게 구현했다는 내용이다.


  1. CEP에 대해서
    1. Event Processing이란?
      1. 데이터 스트림을 추적 및 분석하여 좀 더 나은 insight와 결정을 할 수 있도록 해주는 것을 의미
    2. CEP란?
      1. Event Processing의 일종으로 여러 소스로부터 얻어낸 데이터를 결합하여 여러 이벤트들간의 패턴 및 복잡한 관계를 찾아내는 것을 의미
      2. CEP는 여러 데이터 소스들로부터 기회와 위협을 확인하고 이를 실시간으로 경고를 할 수 있도록 해준다.
      3. 오늘날 CEP는 매우 다양한 분야에서 사용되고 있다.
        1. 금융: 거래 분석, 금융사기 탐지
        2. 항공: 운항 모니터링
        3. 의료: 클레임 프로세싱, 환자 모니터링
        4. 에너와 통신: 정전 탐지
    3. 빅데이터
      1. 기하급수적인 데이터의 증가로 빅데이터 처리 필요함.
      2. CDH를 사용하여 이를 해결할 수 있다.
  2. Architecture and Design
    1. CEP를 만들기 위해 CDH를 사용한 architecture는 다음과 같다.
    2. 위 그림과 같은 architecture에서 사용된 컴포넌트들은 다음과 같다.
      1. Ingest: 이벤트 수집을 위해  Apache Kafka or Apache Flume 사용.
      2. Storage: 수집한 이벤트 저장에는 Apache HBase(또는 미래에는 아마 Kudu)를 사용.
      3. Alerting: Apache Kafka 또는 다른 직접적인 API를 통합하여 경고를 할 수 있게 함.
      4. Stream processing
        1. spark streaming을 사용하여 event processing을 처리함. 
        2. Processing은 마이크로 배치로 처리되며 다음과 같은 일을 한다.
          1. Parsing
          2. Lookup
          3. Persistence
          4. Building of current state from a series of historical events
          5. Custom processing logic
        3. 일례로 다양한 sliding-window위의 여러 Spark RDD stream을 join하여 실시간에 가깝게 insight와 trend를 얻을 수 있다.
        4. 이 배치 작업은 매번 수초 간격으로 진행되며 수초보다 적은 end-to-end latency가 나게 한다.
      5. Business process management
        1. Rules framework는 technical 또는 non-technical한 사용자들도 복잡한 business logic을 디자인할 수 있게 해준다.
        2. 이 글에서는 Apahce spark와 Drools를 함께 사용하여 business의 요구사항에 대해 평가해본다.
      6. Metrics: OpenTSDB와 같은 time-series 데이터베이스의 대쉬보드를 통해 metrics를 제공한다. 또한 Cloudera Search + HUE를 사용해 같은 기능을 사용할 수 있다.
    3. 예제로 sepsis-shock criteria(패혈증 쇼크 기준)을 Drools를 사용하여 동작시켜보자. 해당 조건들은 이곳에 언급된 내용들을 다룬다.

      1. 위 그림에서 볼 수 있듯이 24시간내에 환자의 두 개의 vital이 범위를 넘어갈 경우 SIRS 기준에 부합하게 된다. 
      2. 이 vital들은 다른 시각에 기준에 도달하게 된다. 따라서 매시간 환자의 상태를 재구성하기 위해 HBase를 사용하여 매시간 vital을 읽고 rules를 적용할 수 있게 한다.
      3. 예제를 위해 모든 event마다 모든 환자들의 vital을 가져온다고 가정하자. 그리고 snapshot/state-building step은 스킵한다.
      4. 환자가 SIRS 기준에 부합하게 되면 즉시 sepsis, severe sepsis, septic shock등을 순서대로 체크해야한다. 이 평가 프로세스의 flow chart는 다음과 같다.
      5. 모든 조건들을 찾아내고 업무자들에게 친숙하게 데모를 만들기 위해 Drools decision tables를 사용한다. 이러한 접근은 business logic을 Java/Scala code 또는 custom syntax로 가지는 것보다 좀 더 많은 관중(business analyst 포함)들에게 가시성을 제공한다.
      6. 다음은 sepsis calculator를 충족시키기 위해 만들어진 decision table이다.
      7. 위 그림에 대하여
        1. 연한 빨강색으로 채워진 모든 셀들은 code로 돌아가도록 링크되어 있다.
        2. 오렌지색으로 채워진 모든 셀들은 rule이 성공적으로 평가된 이후에 정해진 값을 가진다.
        3. 녹색으로 채워진 모든 셀들은 주어진 rule을 만족시키기 위해 들어오는 데이터들의 범위들과 값들을 가진다.
        4. 모든 파란색으로 채워진 모든 셀들은 rule들의 이름과 그 조건들이다.
      8. 다음은 Spark와 Drools의 통합의 몇 가지 목표들이다.
        1. Rule들의 실행을 Spark/Streaming으로부터 seamless하게 만든다.
        2. 단순성을 위해 rule 엔진의 stateless 부분을 사용하라. 일정한 시간 간격을 유지하며 상태를 저장하는 spark의 sliding window를 사용할 수 있다.
        3. Rule을 요구사항에 기반하여 순차적 또는 무작위로 실행하라.
        4. 몇몇 metrics를 계산하기 위해 rule 실행 결과를 spark dataframe에 넣어라.
  3. Coding
    1. 지금부터 위 목표를 달성하기 위한 절차와 코드 스니펫을 보여주겠다. 모든 코드는 https://github.com/mganta/sprue 에서 다운로드 받을 수 있다.
      1. 각 파티션에 session factory를 한번만 초기화한다. 그리고 모든 다음 dstream 실행에서 재사용한다.

        KieContainer kContainer = kieServices.newKieContainer(kieRepository.getDefaultReleaseId());
        kContainer.newStatelessKieSession();


      2. 들어오는 데이터를 HBase에 저장한다.
        //store incoming data in hbase
        hbaseContext.streamBulkPut[Patient](patientStream, patientTable,  HBaseUtil.insertIncomingDataIntoHBase, true)
      3. RDD의 각 이벤트마다 모든 rule들을 실행하고 평가 결과를 RDD와 함께 리턴한다.
        //evaluate all the rules for the batch
            patientStream.foreachRDD(rdd => {
             val evaluatedPatients = rdd.mapPartitions(partitionOfRecords => {
                val ksession = KieSessionFactory.getKieSession(xlsFileName)
                val patients = partitionOfRecords.map(patient => {
                  ksession.execute(patient)
                  patient
                })
                patients
              })
      4. RDD를 dataframe으로 변환하고 몇몇 metrics을 계산한다.
        //convert to dataframe
                val patientdf = sqc.applySchema(evaluatedPatients, classOf[Patient])
                //compute statistics
                val countMatrix = patientdf.groupBy("location").agg(max("evaluationDate"), sum("sirsFlag"), sum("sepsisFlag"), sum("severeSepsisFlag"), sum("septicShockFlag"), sum("organDysfunctionSyndrome"))
               countMatrix.show()
      5. HBase에 업데이트를 저장한다.
        hbaseContext.streamBulkPut[Patient](patientStream, patientTable,  HBaseUtil.insertEvaluatedDataIntoHBase, true
      6. Time-series rest api를 호출하고 마이크로 배치 metrics를 게시한다. Time-series 대쉬보드는 이 데이터를 읽을 수 있다.(여기서  OpenTSDB를 어떻게 설치하는 지를 배울 수 있다.)
        //opentsdb update statistics
        countMatrix.foreach(row => {
        TSDBUpdater.loadPatientStats(row.getString(0), row.getLong(1), row.getLong(2), row.getLong(3), row.getLong(4), row.getLong(5), row.getLong(6))
          })
      7. 위의 모든 절차들은 여기 spark driver code와 링크되어 있다.
      8. 이 예제는 무작위로 선출된 환자의 데이터 스트림 생성하기 위해 QueueStream을 사용한다. 실제 시나리오에서는 각 이벤트에서 hl7 메시지를 받게 된다.
      9. 예제를 실행하면 들어오는 각 이벤트에서 rule이 실해되는 걸 볼 수 있다. 그리고 각 상태의 그룹화된 metrics을 볼 수 있다. 이는 아래와 같다.
        Total Patients in batch: 212
        Patients with atleast one condition: 137
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |location|MAX(evaluationDate)|SUM(sirsFlag)|SUM(sepsisFlag)|SUM(severeSepsisFlag)|SUM(septicShockFlag)|SUM(organDysfunctionSyndrome)|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |      MS|      1443198199233|            3|              2|                    2|                   0|                            0|
        |      NE|      1443198199233|            8|              4|                    4|                   1|                            0|
        |      TX|      1443198199233|           10|              8|                    8|                   7|                            1|
        |      NM|      1443198199232|            8|              6|                    6|                   3|                            0|
        |      NY|      1443198199233|            6|              4|                    3|                   3|                            0|
        |      OK|      1443198199233|            7|              5|                    3|                   1|                            0|
        |      VA|      1443198199232|           16|             14|                   12|                   7|                            1|
        |      IL|      1443198199233|            5|              3|                    3|                   1|                            0|
        |      CA|      1443198199233|            7|              6|                    6|                   4|                            0|
        |      KS|      1443198199233|           12|              8|                    8|                   6|                            0|
        |      LA|      1443198199233|           14|              8|                    7|                   2|                            1|
        |      SC|      1443198199233|           12|              9|                    6|                   4|                            0|
        |      FL|      1443198199233|            7|              4|                    4|                   2|                            0|
        |      MN|      1443198199233|            8|              5|                    5|                   2|                            0|
        |      GA|      1443198199233|           14|             12|                   12|                   6|                            0|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        Total Patients in batch: 247
        Patients with atleast one condition: 170
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |location|MAX(evaluationDate)|SUM(sirsFlag)|SUM(sepsisFlag)|SUM(severeSepsisFlag)|SUM(septicShockFlag)|SUM(organDysfunctionSyndrome)|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
        |      MS|      1443198199242|            1|              1|                    1|                   1|                            0|
        |      NE|      1443198199241|           14|             11|                   11|                   5|                            1|
        |      TX|      1443198199242|           13|             10|                    9|                   6|                            0|
        |      NM|      1443198199241|           11|              4|                    3|                   2|                            0|
        |      NY|      1443198199241|           13|              7|                    6|                   4|                            0|
        |      OK|      1443198199241|            7|              5|                    5|                   3|                            0|
        |      VA|      1443198199242|           10|              6|                    6|                   2|                            0|
        |      IL|      1443198199241|           17|             12|                   12|                   5|                            1|
        |      CA|      1443198199241|           17|              8|                    7|                   2|                            0|
        |      KS|      1443198199242|           12|              8|                    7|                   5|                            2|
        |      LA|      1443198199242|           15|             11|                    9|                   4|                            2|
        |      SC|      1443198199241|           13|             10|                    9|                   5|                            0|
        |      FL|      1443198199241|           10|              5|                    5|                   1|                            0|
        |      MN|      1443198199241|           10|              6|                    6|                   3|                            0|
        |      GA|      1443198199241|            7|              2|                    1|                   1|                            0|
        +--------+-------------------+-------------+---------------+---------------------+--------------------+-----------------------------+
      10.  또는 OpenTSDB를 set up하면 다음과 같은 그림을 볼 수 있다.
  4. 결론
    1. 복잡한 시스템을 설계할 때에는 rule 엔진을 사용하는게 유익한 선택이다.(Logic과 데이터의 분리는 domain expert에게 logic 결정에 대한 insight를 주면서 유연한 시스템을 낳는다.)
    2. 지금까지 본 바와 같이 CDH(for Spark, HBase, Kafka)를 rule엔진과 결합하여 사용하는게 복잡한 business logic을 평가하고 실시간으로 이를 동작시키는데 도움이 되는 걸 알 수 있다.
번호 제목 날짜 조회 수
» How-to: Build a Complex Event Processing App on Apache Spark and Drools file 2016.10.31 552
303 How-to: Tune Your Apache Spark Jobs (Part 2) file 2016.10.31 478
302 mybatis와 spring을 org.apache.commons.dbcp2.BasicDataSource의 DataSource로 연동할때 DB설정(참고) 2016.10.31 1257
301 Caused by: java.sql.SQLNonTransientConnectionException: Could not read resultset: unexpected end of stream, read 0 bytes from 4 오류시 확인/조치할 내용 2016.10.31 4387
300 VisualVM 1.3.9을 이용한 spark-submit JVM 모니터링을 위한 설정및 spark-submit실행 옵션 2016.10.28 2414
299 VisualVM 1.3.9을 이용한 JVM 모니터링 file 2016.10.27 891
298 운영중인 상태에서 kafka topic삭제하고 재생성하여 처리되지 않은 메세지 모두 삭제하기 2016.10.24 1242
297 producer / consumer구현시 설정 옵션 설명 2016.10.19 777
296 java.lang.OutOfMemoryError: unable to create new native thread오류 발생지 조치사항 2016.10.17 1205
295 동시에 많은 요청이 endpoint로 몰려서java.net.NoRouteToHostException가 발생하는 경우의 처리방법 2016.10.17 811
294 AIX 7.1에 Python 2.7.11설치하기 2016.10.06 1254
293 프로그래밍 언어별 딥러닝 라이브러리 정리 file 2016.10.05 692
292 모두를 위한 머신러닝과 딥러닝의 강의 file 2016.09.27 750
291 DBCP Datasource(org.apache.commons.dbcp.BasicDataSource) 설정 및 속성 설명 2016.09.26 966
290 AIX 7.1에서 hive실행시 "hive: line 86: readlink: command not found" 오류가 발생시 임시 조치사항 2016.09.25 1014
289 hive기동시 Caused by: java.net.URISyntaxException: Relative path in absolute URI: ${system:java.io.tmpdir%7D/$%7Bsystem:user.name%7D 오류 발생시 조치사항 2016.09.25 1054
288 schema설정없이 hive를 최초에 실행했을때 발생하는 오류메세지및 처리방법 2016.09.25 1492
287 파일끝에 붙는 ^M 일괄 지우기(linux, unix(AIX)) 혹은 파일내에 있는 ^M지우기 2016.09.24 238
286 AIX 7.1에 MariaDB 10.2 소스 설치 2016.09.24 2938
285 ./hadoop-daemon.sh start namenode로 namenode기동시 EditLog의 custerId, namespaceId가 달라서 발생하는 오류 해결방법 2016.09.24 506
위로