Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
1. flume설치파일 다운로드
apache-flume-1.5.2-bin.tar.gz
2. 압축풀기
tar xvfz apache-flume-1.5.2-bin.tar.gz
3. 링크생성
ln -s apache-flume-1.5.2-bin flume
4. 환경변수 수정(vi /home/hadoop/.bashrc)
export FLUME_HOME=/hadoop/flume
export PATH=$PATH:$FLUME_HOME/bin
* 변경사항 반영 : source /home/hadoop/.bashrc
5. Flume Conf
cd $FLUME_HOME/conf
cp flume-conf.properties.template flume.conf
vi flume.conf
agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = hdfsSink
# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = exec
agent.sources.seqGenSrc.command = tail -F /home/bigdata/hadoop-1.2.1/logs/hadoop-hadoop-namenode-localhost.localdomain.log
#가상분산환경에서 테스트용으로 잡은것.
# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/flume/data #테스트용
agent.sinks.hdfsSink.rollInterval = 30
agent.sinks.hdfsSink.sink.batchSize = 100
#Specify the channel the sink should use
agent.sinks.hdfsSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100000
agent.channels.memoryChannel.transactionCapacity = 10000
6. agent기동
[hadoop@master]$ flume-ng agent -conf-file ./flume.conf --name agent
Info: Including Hadoop libraries found via (/usr/local/flume/bin/hadoop) for HDFS access
Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-log4j12-1.7.5.jar from classpath
Info: Including HBASE libraries found via (/usr/local/hbase/bin/hbase) for HBASE access
Info: Excluding /usr/local/hbase/lib/slf4j-api-1.6.4.jar from classpath
Info: Excluding /usr/local/hbase/lib/slf4j-log4j12-1.6.4.jar from classpath
Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-log4j12-1.7.5.jar from classpath
.....
15/05/21 17:38:57 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
15/05/21 17:38:57 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume.conf
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Added sinks: hdfsSink Agent: agent
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
15/05/21 17:38:57 INFO node.AbstractConfigurationProvider: Creating channels
15/05/21 17:38:57 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory
15/05/21 17:38:57 INFO node.AbstractConfigurationProvider: Created channel memoryChannel
15/05/21 17:38:57 INFO source.DefaultSourceFactory: Creating instance of source seqGenSrc, type exec
15/05/21 17:38:57 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfsSink, type: hdfs
15/05/21 17:38:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/21 17:38:58 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
15/05/21 17:38:58 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [seqGenSrc, hdfsSink]
15/05/21 17:38:58 INFO node.Application: Starting new configuration:{ sourceRunners:{seqGenSrc=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:seqGenSrc,state:IDLE} }} sinkRunners:{hdfsSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3b201837 counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }
15/05/21 17:38:58 INFO node.Application: Starting Channel memoryChannel
15/05/21 17:38:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.
15/05/21 17:38:58 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
15/05/21 17:38:58 INFO node.Application: Starting Sink hdfsSink
15/05/21 17:38:58 INFO node.Application: Starting Source seqGenSrc
7. hdfs확인
cat aaa >> /home/hadoop/test.log로 데이타를 넣고 아래 명령으로 확인해본다.
[hadoop@master]$ hadoop fs -lsr /flume
drwxr-xr-x - hadoop supergroup 0 2015-05-21 17:39 /flume/data
-rw-r--r-- 3 hadoop supergroup 208 2015-05-21 17:39 /flume/data/FlumeData.1432197542415
댓글 0
번호 | 제목 | 날짜 | 조회 수 |
---|---|---|---|
6 | source, sink를 직접 구현하여 사용하는 예시 | 2019.05.30 | 573 |
5 | kerberos설정된 상태의 spooldir->memory->hdfs로 저장하는 과정의 flume agent configuration구성 예시 | 2019.05.30 | 911 |
» | flume 1.5.2 설치및 테스트(source : file, sink : hdfs) in HA | 2015.05.21 | 1703 |
3 | source의 type을 spooldir로 하는 경우 해당 경로에 파일이 들어오면 파일단위로 전송함 | 2014.05.20 | 1191 |
2 | 다수의 로그 에이전트로 부터 로그를 받아 각각의 파일로 저장하는 방법(interceptor및 multiplexing) | 2014.04.04 | 4216 |
1 | 동일서버에서 LA와 LC동시에 기동하여 테스트 | 2014.04.01 | 1105 |