메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


/tmp/data/work_result에 파일이 들어오면 spooldir에 의해서 파일을 읽어 memoryChannel을 통하여 HDFS의 hdfs://nameservice1:8020/DATA/work_result에 저장한다.

spool된 파일은 "agent.sources.spooldirSource.deletePolicy = immediate"설정에 의해서 OS상의 파일이 삭제된다.

그리고 HDFS에는 flume소유자로 파일이 생성된다.


---------flume.conf-----------

agent.sources = spooldirSource

agent.channels = memoryChannel

agent.sinks = hdfsSink


agent.sources.spooldirSource.type = spooldir

agent.sources.spooldirSource.spoolDir = /tmp/data/work_result

agent.sources.spooldirSource.channels = memoryChannel

agent.sources.spooldirSource.deserializer.maxLineLength = 100000000

agent.sources.spooldirSource.basenameHeader = true

agent.sources.spooldirSource.deletePolicy = immediate


agent.channels.memoryChannel.type = memory

agent.channels.memoryChannel.capacity = 1000000

agent.channels.memoryChannel.transactionCapacity = 1000



agent.sinks.hdfsSink.type = hdfs

agent.sinks.hdfsSink.channel = memoryChannel

agent.sinks.hdfsSink.hdfs.path = hdfs://nameservice1:8020/DATA/work_result

agent.sinks.hdfsSink.hdfs.fileType = DataStream

agent.sinks.hdfsSink.hdfs.rollCount = 0

agent.sinks.hdfsSink.hdfs.rollInterval= 300

agent.sinks.hdfsSink.hdfs.rollSize= 100000000

agent.sinks.hdfsSink.hdfs.batchSize = 100

agent.sinks.hdfsSink.hdfs.filePrefix = %{basename}

agent.sinks.hdfsSink.hdfs.kerberosPrincipal = flume@GOOPER.COM

agent.sinks.hdfsSink.hdfs.kerberosKeytab = /var/lib/keytab/flume.keytab

agent.sinks.hdfsSink.hdfs.proxyUser = flume

번호 제목 날짜 조회 수
39 [CDP7.1.7] oozie sqoop action으로 import혹은 export수행시 발생한 오류에 대한 자세한 로그 확인 하는 방법 2024.04.19 209
38 Oracle 12c DB의 LOB타입 컬럼이 있는 테이블을 import할 때 주의 할 사항 2022.09.14 1101
37 oracle 접속 방식에 따른 --connect 지정 방법 2022.02.11 509
36 Oracle NLOB type의 데이터를 import하는 경우 No Java type for SQL type 2011 for column rst와 같은 오류 발생시 조치사항 2022.01.14 337
35 oracle 12에 sqoop해서 데이터 import하기 (console에서 sqoop import하는 방법) 2021.12.31 1001
34 [sap] Error: java.io.IOException: SQLException in nextKeyValue 오류 발생 2020.06.08 1304
33 [sqoop] mapper를 2이상으로 설정하기 위한 split-by컬럼을 찾을때 유용하게 활용할 수 있는 쿼리 2020.05.13 1122
32 source, sink를 직접 구현하여 사용하는 예시 2019.05.30 1432
» kerberos설정된 상태의 spooldir->memory->hdfs로 저장하는 과정의 flume agent configuration구성 예시 2019.05.30 994
30 kafka에서 메세지 중복 consume이 발생할 수 있는 상황 2018.10.23 974
29 컬럼및 라인의 구분자를 지정하여 sqoop으로 데이타를 가져오고 hive테이블을 생성하는 명령문 2018.08.03 1147
28 sqoop으로 mariadb에 접근해서 hive 테이블로 자동으로 생성하기 2018.08.03 1340
27 Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스 2017.04.26 503
26 kafka-manager 1.3.3.4 설정및 실행하기 2017.03.20 2363
25 Kafka Offset Monitor로 kafka 상태 모니터링 하기 file 2016.11.08 1540
24 Flume과 Kafka를 사용한 초당 100만개 로그 수집 테스트 file 2016.10.31 1422
23 운영중인 상태에서 kafka topic삭제하고 재생성하여 처리되지 않은 메세지 모두 삭제하기 2016.10.24 1245
22 producer / consumer구현시 설정 옵션 설명 2016.10.19 777
21 No broker partitions consumed by consumer thread오류 발생시 확인/조치할 사항 2016.09.02 1020
20 kafka 0.9.0.1버젼의 producer와 kafka버젼이 0.10.0.1인 consumer가 서로 대화하는 모습 2016.08.18 529
위로