메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


1. flume설치파일 다운로드

apache-flume-1.5.2-bin.tar.gz


2. 압축풀기

tar xvfz apache-flume-1.5.2-bin.tar.gz


3. 링크생성

ln -s apache-flume-1.5.2-bin flume


4. 환경변수 수정(vi /home/hadoop/.bashrc)

export FLUME_HOME=/hadoop/flume

export PATH=$PATH:$FLUME_HOME/bin

* 변경사항 반영 : source /home/hadoop/.bashrc


5. Flume Conf

  cd $FLUME_HOME/conf

  cp flume-conf.properties.template flume.conf

  vi flume.conf


agent.sources = seqGenSrc

agent.channels = memoryChannel

agent.sinks = hdfsSink


# For each one of the sources, the type is defined

agent.sources.seqGenSrc.type = exec

agent.sources.seqGenSrc.command = tail -F /home/bigdata/hadoop-1.2.1/logs/hadoop-hadoop-namenode-localhost.localdomain.log

#가상분산환경에서 테스트용으로 잡은것.


# The channel can be defined as follows.

agent.sources.seqGenSrc.channels = memoryChannel


# Each sink's type must be defined

agent.sinks.hdfsSink.type = hdfs

agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/flume/data     #테스트용

agent.sinks.hdfsSink.rollInterval = 30

agent.sinks.hdfsSink.sink.batchSize = 100


#Specify the channel the sink should use

agent.sinks.hdfsSink.channel = memoryChannel


# Each channel's type is defined.

agent.channels.memoryChannel.type = memory


# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent.channels.memoryChannel.capacity = 100000

agent.channels.memoryChannel.transactionCapacity = 10000


6. agent기동

[hadoop@master]$ flume-ng agent -conf-file ./flume.conf --name agent

Info: Including Hadoop libraries found via (/usr/local/flume/bin/hadoop) for HDFS access

Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-api-1.7.5.jar from classpath

Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-log4j12-1.7.5.jar from classpath

Info: Including HBASE libraries found via (/usr/local/hbase/bin/hbase) for HBASE access

Info: Excluding /usr/local/hbase/lib/slf4j-api-1.6.4.jar from classpath

Info: Excluding /usr/local/hbase/lib/slf4j-log4j12-1.6.4.jar from classpath

Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-api-1.7.5.jar from classpath

Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-log4j12-1.7.5.jar from classpath

.....

15/05/21 17:38:57 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting

15/05/21 17:38:57 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume.conf

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Added sinks: hdfsSink Agent: agent

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink

15/05/21 17:38:57 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]

15/05/21 17:38:57 INFO node.AbstractConfigurationProvider: Creating channels

15/05/21 17:38:57 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory

15/05/21 17:38:57 INFO node.AbstractConfigurationProvider: Created channel memoryChannel

15/05/21 17:38:57 INFO source.DefaultSourceFactory: Creating instance of source seqGenSrc, type exec

15/05/21 17:38:57 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfsSink, type: hdfs

15/05/21 17:38:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

15/05/21 17:38:58 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false

15/05/21 17:38:58 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [seqGenSrc, hdfsSink]

15/05/21 17:38:58 INFO node.Application: Starting new configuration:{ sourceRunners:{seqGenSrc=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:seqGenSrc,state:IDLE} }} sinkRunners:{hdfsSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3b201837 counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }

15/05/21 17:38:58 INFO node.Application: Starting Channel memoryChannel

15/05/21 17:38:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.

15/05/21 17:38:58 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started

15/05/21 17:38:58 INFO node.Application: Starting Sink hdfsSink

15/05/21 17:38:58 INFO node.Application: Starting Source seqGenSrc


7. hdfs확인

cat aaa >> /home/hadoop/test.log로 데이타를 넣고 아래 명령으로 확인해본다.


[hadoop@master]$ hadoop fs -lsr /flume

drwxr-xr-x   - hadoop supergroup          0 2015-05-21 17:39 /flume/data

-rw-r--r--   3 hadoop supergroup        208 2015-05-21 17:39 /flume/data/FlumeData.1432197542415

번호 제목 날짜 조회 수
641 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 185
640 http://blog.naver.com... 2017.06.23 186
639 HBase write 성능 튜닝 file 2017.07.18 186
638 특정 커밋 시점(commit id를 기준으로)으로 돌리기(reset) 2016.11.21 188
637 파일명 혹은 확장자 일괄 변경하는 방법 2017.01.26 188
636 HA(Namenode, ResourceManager, Kerberos) 및 보안(Zookeeper, Hadoop) 2018.03.16 188
635 슬라이딩 윈도우 예제 2016.07.28 189
634 천문학적, 기후학적, 기상학적, 생물학적, 농사계절 구분 2015.12.16 191
633 TopBraid Composer에서 SPIN 사용법 file 2016.02.25 191
632 ?a는 모두 표시하면서 ?b와 비교하여 ?a=?b는 ""로 하고 ?a!=?b 인경우는 해당값을 가지는 결과 집합을 구하는 경우 file 2016.01.29 195
631 [impala]insert into db명.table명 select a, b from db명.table명 쿼리 수행시 "Memory limit exceeded: Failed to allocate memory for Parquet page index"오류 조치 방법 2023.05.31 195
630 AnalysisException: Incomplatible return type 'DECIMAL(38,0)' and 'DECIMAL(38,5)' of exprs가 발생시 조치 2021.07.26 196
629 jdk 9이상 사용하려면 repository를 아래와 같이 지정해야한다. 2019.06.02 197
628 centos 6에서 mariadb 5.1 to 10.0 으로 upgrade 2016.11.01 201
627 eclipse editor 설정방법 2022.02.01 201
626 oneM2M Specification(Draft Release 3, 2, 1), Draft Technical Reports 2017.10.25 202
625 [Elephas] Jena Elephas를 이용하여 Spark에서 rdfTriples의 RDD를 만들고 RDD관련 작업하는 샘플소스 2016.08.10 203
624 파일은 남겨두고 파일 내용만 지우고자 할 때. 2017.08.30 205
623 fuseki의 endpoint를 이용한 insert, delete하는 sparql예시 2018.02.14 207
622 하둡 클러스터 전체 노드를 다시 기동하면 invalidate metadata를 수행해야 데이터가 틀어지지 않는다. 2019.05.20 208
위로