Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
1. flume설치파일 다운로드
apache-flume-1.5.2-bin.tar.gz
2. 압축풀기
tar xvfz apache-flume-1.5.2-bin.tar.gz
3. 링크생성
ln -s apache-flume-1.5.2-bin flume
4. 환경변수 수정(vi /home/hadoop/.bashrc)
export FLUME_HOME=/hadoop/flume
export PATH=$PATH:$FLUME_HOME/bin
* 변경사항 반영 : source /home/hadoop/.bashrc
5. Flume Conf
cd $FLUME_HOME/conf
cp flume-conf.properties.template flume.conf
vi flume.conf
agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = hdfsSink
# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = exec
agent.sources.seqGenSrc.command = tail -F /home/bigdata/hadoop-1.2.1/logs/hadoop-hadoop-namenode-localhost.localdomain.log
#가상분산환경에서 테스트용으로 잡은것.
# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://mycluster/flume/data #테스트용
agent.sinks.hdfsSink.rollInterval = 30
agent.sinks.hdfsSink.sink.batchSize = 100
#Specify the channel the sink should use
agent.sinks.hdfsSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100000
agent.channels.memoryChannel.transactionCapacity = 10000
6. agent기동
[hadoop@master]$ flume-ng agent -conf-file ./flume.conf --name agent
Info: Including Hadoop libraries found via (/usr/local/flume/bin/hadoop) for HDFS access
Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-log4j12-1.7.5.jar from classpath
Info: Including HBASE libraries found via (/usr/local/hbase/bin/hbase) for HBASE access
Info: Excluding /usr/local/hbase/lib/slf4j-api-1.6.4.jar from classpath
Info: Excluding /usr/local/hbase/lib/slf4j-log4j12-1.6.4.jar from classpath
Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /usr/local/flume/share/usr/local/common/lib/slf4j-log4j12-1.7.5.jar from classpath
.....
15/05/21 17:38:57 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
15/05/21 17:38:57 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume.conf
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Added sinks: hdfsSink Agent: agent
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Processing:hdfsSink
15/05/21 17:38:57 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]
15/05/21 17:38:57 INFO node.AbstractConfigurationProvider: Creating channels
15/05/21 17:38:57 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory
15/05/21 17:38:57 INFO node.AbstractConfigurationProvider: Created channel memoryChannel
15/05/21 17:38:57 INFO source.DefaultSourceFactory: Creating instance of source seqGenSrc, type exec
15/05/21 17:38:57 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfsSink, type: hdfs
15/05/21 17:38:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/21 17:38:58 INFO hdfs.HDFSEventSink: Hadoop Security enabled: false
15/05/21 17:38:58 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [seqGenSrc, hdfsSink]
15/05/21 17:38:58 INFO node.Application: Starting new configuration:{ sourceRunners:{seqGenSrc=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:seqGenSrc,state:IDLE} }} sinkRunners:{hdfsSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3b201837 counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }
15/05/21 17:38:58 INFO node.Application: Starting Channel memoryChannel
15/05/21 17:38:58 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.
15/05/21 17:38:58 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started
15/05/21 17:38:58 INFO node.Application: Starting Sink hdfsSink
15/05/21 17:38:58 INFO node.Application: Starting Source seqGenSrc
7. hdfs확인
cat aaa >> /home/hadoop/test.log로 데이타를 넣고 아래 명령으로 확인해본다.
[hadoop@master]$ hadoop fs -lsr /flume
drwxr-xr-x - hadoop supergroup 0 2015-05-21 17:39 /flume/data
-rw-r--r-- 3 hadoop supergroup 208 2015-05-21 17:39 /flume/data/FlumeData.1432197542415