Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
1. data준비
(각 File1, File2는 hdfs상에 존재해야한다,
즉, hadoop fs -put UserDetails.txt DeliveryDetails.txt /data1/hadoop/mr/in을 실행한다)
File 1 – UserDetails.txt(mobile#, 사용자이름)
123 456, Jim
456 123, Tom
789 123, Harry
789 456, Richa
File 2 – DeliveryDetails.txt(mobile#, 상태코드)
123 456, 001
456 123, 002
789 123, 003
789 456, 004
File 3 – DeliveryStatusCodes.txt(상태코드, 상태코드명)
001, Delivered
002, Pending
003, Failed
004, Resend
* File3은 os 파일시스템에 존재해야한다.
(예, /home/hadoop/hadoop/working/DeliveryStatusCodes.txt)
2. 예상되는 최종 결과 포맷
Jim, Delivered
Tom, Pending
Harry, Failed
Richa, Resend
3. UserFileMapper.java
package com.gooper.join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class UserFileMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private String cellNumber, customerName, fileTag="CD~";
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
String splitarray[] = line.split(",");
cellNumber = splitarray[0].trim();
customerName = splitarray[1].trim();
private String cellNumber, customerName, fileTag="CD~";
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
String splitarray[] = line.split(",");
cellNumber = splitarray[0].trim();
customerName = splitarray[1].trim();
// reducer에서의 구분을 위해서 value값 앞에 "CD~"를 붙여준다.
output.collect(new Text(cellNumber), new Text(fileTag+customerName));
}
}
output.collect(new Text(cellNumber), new Text(fileTag+customerName));
}
}
4. DeliverFileMapper.java
package com.gooper.join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class DeliverFileMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private String cellNumber, deliveryCode, fileTag="DR~";
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
String splitarray[] = line.split(",");
cellNumber = splitarray[0].trim();
deliveryCode = splitarray[1].trim();
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
String splitarray[] = line.split(",");
cellNumber = splitarray[0].trim();
deliveryCode = splitarray[1].trim();
// reducer에서의 구분을 위해서 value값 앞에 "DR~"를 붙여준다.
output.collect(new Text(cellNumber), new Text(fileTag+deliveryCode));
}
}
output.collect(new Text(cellNumber), new Text(fileTag+deliveryCode));
}
}
5. SmsDriver.java
package com.gooper.join;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class SmsReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
private String customerName, deliveryReport;
private static Map<String,String> DeliveryCodesMap = new HashMap<String,String>();
public void configure(JobConf job) {
loadDeliveryStatusCodes();
}
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text,Text> output, Reporter reporter) throws IOException {
while(values.hasNext()) {
String currValue = values.next().toString();
String valueSplitted[] = currValue.split("~");
String keyValue = key.toString();
System.out.println("키값 , 라인값 : "+keyValue+","+currValue);
//System.out.println("reporter값 : "+reporter.toString());
// CD로 시작되면 고객명을 그대로 사용하고..
if(valueSplitted[0].equals("CD")) {
customerName = valueSplitted[1].trim();
// DR로 시작되면 code값이므로 code에 대한 값을 찾아서 그 명칭을 출력한다.
} else if(valueSplitted[0].equals("DR")) {
deliveryReport = DeliveryCodesMap.get(valueSplitted[1].trim());
}
}
if(customerName != null && deliveryReport != null) {
output.collect(new Text(customerName+"("+key+")"), new Text(deliveryReport));
} else if(deliveryReport == null) { // codeㅇ mapping되는 값이 없으면 "deliveryReport"라는 문자열 자체를 출력한다.
output.collect(new Text(customerName+"("+key+")"), new Text("deliveryReport"));
}
}
private void loadDeliveryStatusCodes() {
String strRead;
try {
//BufferedReader reader = new BufferedReader(new FileReader("/data1/hadoop/mr/in/DeliveryStatusCodes.txt"));
// 여기는 로컬 os파일 위치를 지정해야 함..
BufferedReader reader = new BufferedReader(new FileReader("/home/hadoop/hadoop/working/DeliveryStatusCodes.txt"));
while((strRead = reader.readLine() ) != null) {
String splitarray[] = strRead.split(",");
DeliveryCodesMap.put(splitarray[0].trim(), splitarray[1].trim()) ;
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch(IOException e) {
e.printStackTrace();
}
}
}
private String customerName, deliveryReport;
private static Map<String,String> DeliveryCodesMap = new HashMap<String,String>();
public void configure(JobConf job) {
loadDeliveryStatusCodes();
}
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text,Text> output, Reporter reporter) throws IOException {
while(values.hasNext()) {
String currValue = values.next().toString();
String valueSplitted[] = currValue.split("~");
String keyValue = key.toString();
System.out.println("키값 , 라인값 : "+keyValue+","+currValue);
//System.out.println("reporter값 : "+reporter.toString());
// CD로 시작되면 고객명을 그대로 사용하고..
if(valueSplitted[0].equals("CD")) {
customerName = valueSplitted[1].trim();
// DR로 시작되면 code값이므로 code에 대한 값을 찾아서 그 명칭을 출력한다.
} else if(valueSplitted[0].equals("DR")) {
deliveryReport = DeliveryCodesMap.get(valueSplitted[1].trim());
}
}
if(customerName != null && deliveryReport != null) {
output.collect(new Text(customerName+"("+key+")"), new Text(deliveryReport));
} else if(deliveryReport == null) { // codeㅇ mapping되는 값이 없으면 "deliveryReport"라는 문자열 자체를 출력한다.
output.collect(new Text(customerName+"("+key+")"), new Text("deliveryReport"));
}
}
private void loadDeliveryStatusCodes() {
String strRead;
try {
//BufferedReader reader = new BufferedReader(new FileReader("/data1/hadoop/mr/in/DeliveryStatusCodes.txt"));
// 여기는 로컬 os파일 위치를 지정해야 함..
BufferedReader reader = new BufferedReader(new FileReader("/home/hadoop/hadoop/working/DeliveryStatusCodes.txt"));
while((strRead = reader.readLine() ) != null) {
String splitarray[] = strRead.split(",");
DeliveryCodesMap.put(splitarray[0].trim(), splitarray[1].trim()) ;
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch(IOException e) {
e.printStackTrace();
}
}
}
6. 설명
가. SmsDriver.java에서
MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, UserFileMapper.class);
MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, DeliverFileMapper.class);
MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, DeliverFileMapper.class);
을 이용하여 각각의 파일을 읽어 들인다.
이때 읽어들이는 처리는 UserFileMapper와 DeliverFileMapper가 각각 맡는다
이때 join key는 mobile phone no가 된다.
나. 두개의 input파일을 reduce에게 보내면 reducer에서는 key를 기준으로 value가 각각의 값을 가지고 있으므로
code값인 경우는 HashMap에 저장된 code정보를 mapping하여 code에 대한 명칭을 output하게 만든다.
최종적으로는 이름과 상태값을 가지는 결과 파일이 만들어진다.
다. Reducer에슨 각각의 구분이 필요하므로 Mapper에서 각각의 값에 "CD~" 혹은 "DR~"을 붙여 구분할 수 있도록 선처리를 한다
라. 파일을 build하고 jar로 만들고 다음과 같이 실행한다.
hadoop@bigdata-host:~/hadoop/working$ hadoop jar gooper-hadoopexamples.jar com.gooper.join.SmsDriver /data1/hadoop/mr/in/UserDetails.txt /data1/hadoop/mr/in/DeliveryDetails.txt /data1/hadoop/mr/out/join/d
마. 실행 결과 확인
hadoop@bigdata-host:~/hadoop/working$ hadoop fs -cat /data1/hadoop/mr/out/join/d/part-00000
Jim(123 456) Delivered
Tom(456 123) Pending
Harry(789 123) Failed
Richa(789 456) Resend
Jim(123 456) Delivered
Tom(456 123) Pending
Harry(789 123) Failed
Richa(789 456) Resend
바. reducer에 인입되는 key와 value를 확인하면 아래와 같다.
키값 , 라인값 : 123 456,CD~Jim
키값 , 라인값 : 123 456,DR~001
키값 , 라인값 : 456 123,DR~002
키값 , 라인값 : 456 123,CD~Tom
키값 , 라인값 : 789 123,CD~Harry
키값 , 라인값 : 789 123,DR~003
키값 , 라인값 : 789 456,DR~004
키값 , 라인값 : 789 456,CD~Richa
키값 , 라인값 : 123 456,DR~001
키값 , 라인값 : 456 123,DR~002
키값 , 라인값 : 456 123,CD~Tom
키값 , 라인값 : 789 123,CD~Harry
키값 , 라인값 : 789 123,DR~003
키값 , 라인값 : 789 456,DR~004
키값 , 라인값 : 789 456,CD~Richa