메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


spark 테스트 프로그램 몇개

package com.gooper.test;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaSparkPi {
	
	static final SparkConf sparkConf=new SparkConf().setAppName("JavaSparkPi");
	static {
		sparkConf.setMaster("spark://gsda1:7077,gsda2:7077");
		sparkConf.setMaster("local[*]");

	}
    
	static final JavaSparkContext jsc=new JavaSparkContext(sparkConf);
	static  final Broadcast<List<String>> temp = jsc.broadcast(Arrays.asList("hello world", "", "hi park", "", "sss ff"));

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start................");

	// PI값 구하기
    int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
    int n = 100000 * slices;
    List<Integer> l = new ArrayList<Integer>(n);
    for (int i = 0; i < n; i++) {
      l.add(i);
    }

    JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
    
    int count = dataSet.map((z) ->  {
        double x = Math.random() * 2 - 1;
        double y = Math.random() * 2 - 1;
        if(x * x + y * y <= 1)  return 1; else return 0;
    }).reduce((s, s2) -> (s + s2));

    System.out.println("Pi is roughly " + 4.0 * count / n);

    
    System.out.println("=========== test start =================================");
    test(jsc);
    System.out.println("=========== test end =================================");
    
    System.out.println("=========== test2 start =================================");
    test2(jsc);
    System.out.println("=========== test2 end =================================");
    
    System.out.println("=========== test3 start =================================");
    test3(jsc);
    System.out.println("=========== test3 end =================================");

    System.out.println("=========== test4 start =================================");
    test4(jsc);
    System.out.println("=========== test4 end =================================");

    System.out.println("=========== test5 start =================================");
    test5(jsc);
    System.out.println("=========== test5 end =================================");

    System.out.println("=========== test6 start =================================");
    test6(jsc);
    System.out.println("=========== test6 end =================================");

    System.out.println("=========== test7 start =================================");
    test7(jsc);
    System.out.println("=========== test7 end =================================");

    System.out.println("=========== test8 start =================================");
    test8(jsc);
    System.out.println("=========== test8 end ================================="); 

    System.out.println("=========== test9 start =================================");
    test9(jsc);
    System.out.println("=========== test9 end =================================");

    System.out.println("=========== test10 start =================================");
    test10(jsc);
    System.out.println("=========== test10 end =================================");

    System.out.println("=========== test11 start =================================");
    test11(jsc);
    System.out.println("=========== test11 end =================================");

    System.out.println("=========== test12 start =================================");
    test12(jsc);
    System.out.println("=========== test12 end =================================");

    System.out.println("=========== test13 start =================================");
    test13(jsc);
    System.out.println("=========== test13 end =================================");

    System.out.println("=========== test14 start =================================");
    test14(jsc);
    System.out.println("=========== test14 end =================================");

    System.out.println("=========== test15 start =================================");
    test15();
    System.out.println("=========== test15 end =================================");

    System.out.println("=========== test16 start =================================");
    test16(jsc);
    System.out.println("=========== test16 end =================================");

    System.out.println("=========== test17 start =================================");
    test17(jsc);
    System.out.println("=========== test17 end =================================");

    System.out.println("=========== test18 start =================================");
    test18(jsc);
    System.out.println("=========== test18 end =================================");

    System.out.println("=========== test19 start =================================");
    test19(jsc);
    System.out.println("=========== test19 end =================================");
    
    jsc.stop();
    jsc.close();
    
    System.out.println("end................");
  }
  
  // List를 RDD로 변환하고 map연상을 통해서 x*x한 값으로 구성된 JavaRDD를 만들어서 화면에 출력 
  static void test (JavaSparkContext sc) {
	  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
	  JavaRDD<Integer> result = rdd.map((x) -> {return x*x;});
	  
	  System.out.println("result ==>"+StringUtils.join(result.collect(), ","));
  }
  
  // 문자열을 space로 분리(람다식을 이용함)하여 화면에 출력
  static void test2 (JavaSparkContext sc) {
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "hi"));
	  JavaRDD<String> words = lines.flatMap( (line) -> {return Arrays.asList(line.split(" ")).iterator();});
	  
	  System.out.println("result ==>"+StringUtils.join(words.collect(), ","));
  }
  
  
  // RDD의 함수인 disticnt, union, intersection, subtract, cartesian, countByValue를 적용하여 화면에 결과 출력 
  static void test3 (JavaSparkContext sc) {
	  JavaRDD<String> data1 = sc.parallelize(Arrays.asList("coffee", "coffee", "panda", "monkey", "tea"));
	  JavaRDD<String> data2 = sc.parallelize(Arrays.asList("coffee", "monkey", "kitty"));
	  JavaRDD<Integer> data3 = sc.parallelize(Arrays.asList(1,2,3));
	  
	  System.out.println("distinct ==>"+data1.distinct().collect());
	  System.out.println("union ==>"+data1.union(data2).collect());
	  System.out.println("intersection ==>"+data1.intersection(data2).collect());
	  System.out.println("subtract ==>"+data1.subtract(data2).collect());
	  System.out.println("cartesion ==>"+data1.cartesian(data3).collect());
	  System.out.println("countByValue ==>"+data1.countByValue());
  }
  
  
  // persist를 사용하여 reduce, fold등를 적용해보고, double형의 RDD로 변환하여 mean값을 구하여 출력함
  static void test4(JavaSparkContext sc) {
	  JavaRDD<Integer> data1 = sc.parallelize(Arrays.asList(1,2,3,4));
	  JavaRDD<Integer> data2 = sc.parallelize(Arrays.asList(3,4,5));
	  
	  List<Integer> data3 = new ArrayList<Integer>();
	  data3.add(1);
	  data3.add(2);
	  data3.add(3);
	  data3.add(4);
	  
	  JavaRDD<Integer> map = data1.map(x -> x+1);
	  map.persist(StorageLevel.MEMORY_AND_DISK());
	  
	  Function2<Integer, Integer, Integer> reduce = new Function2<Integer, Integer, Integer>() {
		 public Integer call(Integer x, Integer y) {
			 return x+y;
		 }
	  };
	  
	  DoubleFunction<Integer> df = new DoubleFunction<Integer>() {
		  public double call(Integer x) {
			  return (double) x;
		  }
	  };
	  
	  System.out.println("map==>"+data1.map(x -> x+1).reduce((x, y)->{return x+y;}));
	  
	  System.out.println("fold==>"+map.fold(0, reduce));
	  
	  map.foreach((x)->System.out.println(x));
	  
	  JavaDoubleRDD result = data1.mapToDouble((x) -> x);
	  System.out.println("mean ===>"+result.mean());
	  result.foreach((x) -> System.out.println(x));
	  
	  System.out.println("--------------------------");
	  
	  JavaDoubleRDD result2 = map.mapToDouble(df);
  	  System.out.println("mean by DoubleFuntion()===>"+result2.mean());
	  result2.foreach((x) -> System.out.println(x)); 

  }
  
  
  // 숫자형의 JavaRDD를 이용하여 aggregate함수를 사용하는 예제
  static void test5(JavaSparkContext sc) {
	  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
	  
	  Function2<AvgCount, Integer, AvgCount> addAndCount =
			  new Function2<AvgCount, Integer, AvgCount> (){
				private static final long serialVersionUID = 122222L;

				public AvgCount call(AvgCount a, Integer x) {
			  		a.total += x;
			  		a.num += 1;
			  		return a;
			  	}
			  };

	  Function2<AvgCount, AvgCount, AvgCount> combine =
	  new Function2<AvgCount, AvgCount, AvgCount> (){
		private static final long serialVersionUID = 11111L;

		public AvgCount call(AvgCount a, AvgCount b) {
	  		a.total += b.total;
	  		a.num += b.num;
	  		return a;
	  	}
	  };

	  AvgCount initial = new AvgCount(0, 0);
	  AvgCount result = rdd.aggregate(initial, addAndCount, combine);
      System.out.println(result.avg());
  }
  
  
  // 문자열 RDD를 key/value형태로 바꾸고 key를 기준을 작동하는 sortByKey, reduceByKey, groupByKey, sortByKey를 적용해보는 예제
  static void test6(JavaSparkContext sc) {
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "hi", "hi park", "dk"));
	  
	PairFunction<String, String, Integer> keyData = 
		new PairFunction<String, String, Integer>() {
		public Tuple2<String, Integer> call(String x) {
			return new Tuple2(x.split(" ")[0], x.length());
		}
	};
	
	JavaPairRDD<String, Integer> pairs = lines.mapToPair(keyData);
	pairs.foreach(x->System.out.println(x));
	
	JavaPairRDD<String, Integer> reduceByKey = pairs.reduceByKey(   (x, y) -> { return (x+y);} );
	JavaPairRDD<String, Iterable<Integer>> groupByKey = pairs.groupByKey();

	JavaPairRDD<String, Integer> sortByKey = pairs.sortByKey(false);
	
	System.out.println("reduceByKey =>"+reduceByKey.collect() );
	
	System.out.println("groupByKey =>"+groupByKey.collect() );
	
	System.out.println("sortBykey => "+sortByKey.collect() );
  }

  //   Tuple2형의 RDD를 이용하여 mapToPair를 적용하여 JavaPairRDD를 만들면서 각각의 RDD를 출력해보는 예제
  //  두개의 JavaPairRDD를 이용하여 subtractByKey, subtract, rightOuterJoin, leftOuterJoin, cogroup를 적용해보는 예제 
  static void test7(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
	  
	  data1.add(new Tuple2("a",2));
	  data1.add(new Tuple2("c",4));
	  data1.add(new Tuple2("c",6));
	  
	  data2.add(new Tuple2("c",4));
	  
	  JavaRDD<Tuple2<String, Integer>> pdataa1 = sc.parallelize(data1);
	  
	  JavaPairRDD<String, Integer> pdataa11 = pdataa1.mapToPair(  (x) -> {return new Tuple2(x._1, x._2);}  );
	  
	  System.out.println("pdataa1 ==>"+pdataa1);
	  System.out.println("pdataa11 ==>"+pdataa11);
	  
	  System.out.println("pdataa1 ==>"+pdataa1.collect());
	  System.out.println("pdataa11 ==>"+pdataa11.collect());
	  
	  JavaPairRDD<String, Integer> pdata1 = sc.parallelizePairs(data1);
	  JavaPairRDD<String, Integer> pdata2 = sc.parallelizePairs(data2);
	  
	  System.out.println("pdata1 ==>"+pdata1.collect());
	  System.out.println("pdata2 ==>"+pdata2.collect());

	System.out.println("subtractByKey =>"+pdataa11.subtractByKey(pdata2).collect());
	System.out.println("subtract =>"+pdata1.subtract(pdata2).collect());
	
	System.out.println("join =>"+pdata1.join(pdata2).collect());
	System.out.println("rightOuterJoin =>"+pdata1.rightOuterJoin(pdata2).collect());
	System.out.println("leftOuterJoin =>"+pdata1.leftOuterJoin(pdata2).collect());
	
	System.out.println("cogroup =>"+pdata1.cogroup(pdata2).collect());

	Function<Integer, Integer> ff = new Function<Integer, Integer>() {
		private static final long serialVersionUID = 11234L;
		int sum = 100;
		public Integer call (Integer x) {
			sum += x;
			return sum;
		}
	};
	
	System.out.println("mapValues =>" + pdata1.mapValues(ff).collect()); 
	
	//System.out.println("mapValues 1=>" + pdata1.reduce( (x2._2) -> {return x2._2;})           );
	System.out.println("reduce =>" + pdata1.reduce( (x2, y2) -> {return new Tuple2("sum of all elements", (x2._2+ y2._2) );})           );
  }
  
  // String형의 List를 JavaRDD를 만들고 이를 JavaPairRDD로 변환후 reduceByKey를 적용하는 예제
  static void test8(JavaSparkContext sc) {
	  List<String> data1 = new ArrayList<String>();
	  data1.add("ab");
	  data1.add("abcd");
	  data1.add("ab");
	  data1.add("cd");
	  
	  JavaRDD<String> pdata = sc.parallelize(data1);
	  
	  JavaPairRDD<String, Integer> pdata1 = pdata.mapToPair(x-> {return new Tuple2(x, 1);});
	  System.out.println("mapToPair==>"+pdata1.collect());
	  
	  JavaPairRDD<String, Integer> pdata2 = pdata1.reduceByKey( (x, y)-> { return (x+y); } );
	  System.out.println("reduceByKey==>"+pdata2.collect() );

  }
  
  //   sc.parallelizePairs를 이용하여  JavaPairRDD를 만들고 combineByKey 적용하고 Map으로 collect후에 출력하는 예제
  static void test9(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  data1.add(new Tuple2("a", 1));
	  data1.add(new Tuple2("b", 1));
	  data1.add(new Tuple2("a", 10));
	  data1.add(new Tuple2("c", 9));
	  
	  JavaPairRDD<String, Integer> pdata = sc.parallelizePairs(data1);
	  
	  Function<Integer, AvgCount2> createCombiner = new Function<Integer, AvgCount2>() {
			public AvgCount2 call(Integer x) {
				return new AvgCount2(x, 1);
			}
		};

		Function2<AvgCount2, Integer, AvgCount2> mergeValue =
			new Function2<AvgCount2, Integer, AvgCount2>() {
			public AvgCount2 call(AvgCount2 a, Integer x) {
				a.total_ += x;
				a.num_ += 1;
				return a;
			}
		};

		Function2<AvgCount2, AvgCount2, AvgCount2> mergeCombiner =
			new Function2<AvgCount2, AvgCount2, AvgCount2>() {
			public AvgCount2 call(AvgCount2 a, AvgCount2 b) {
				a.total_ += b.total_;
				a.num_ += b.num_;
				return a;
			}
		};
	  
	  JavaPairRDD<String, AvgCount2> avgCounts =  pdata.combineByKey(createCombiner, mergeValue, mergeCombiner);
	  Map<String, AvgCount2> countMap = avgCounts.collectAsMap();

	  for(Entry<String, AvgCount2> entry : countMap.entrySet()) {
	  	System.out.println(entry.getKey() + ":" + entry.getValue().avg());
	  }
  
  }
  
  // 두개의 Tuple2형 List를 이용하여 sc.paralledizePairs를 이용하여 JavaPairRDD로 변환하고 leftOuterJoin를 적용하는 예제
  static void test10(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  data1.add(new Tuple2("x", 10));	  
	  data1.add(new Tuple2("a", 1));
	  data1.add(new Tuple2("b", 1));

	  
	  
	  List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
	  data2.add(new Tuple2("a", "aa"));
	  data2.add(new Tuple2("b", "bb"));
	  
	  JavaPairRDD<String, Integer> pdata1 = sc.parallelizePairs(data1);
	  JavaPairRDD<String, String> pdata2 = sc.parallelizePairs(data2);
	  
	  pdata1.sortByKey(true);
	  
	 JavaPairRDD<String, Tuple2<Integer,Optional<String>>> result = pdata1.leftOuterJoin(pdata2);

	 System.out.println("pdata1==>"+pdata1.collect());
	 System.out.println("pdata2==>"+pdata2.collect());
	 
	 System.out.println("result==>"+result.collect());
  
  }
  
  // test.json파일을 일거서 메모리에 적재하고 데이타를 파싱하여 Person1 객체에 담아 partition별로 map작업을 수행고 HDFS에 저장하는 예제  
  static void test11(JavaSparkContext sc) {
	  String dir = "formatted-out";
	  
   	 JavaRDD<String> input = sc.textFile("file:///tmp/test.json", 5).persist(StorageLevel.MEMORY_ONLY_2());
   	 
   	 JavaRDD<Person1> result = input.mapPartitions(new ParseJson());
   	 
   	 System.out.println("persons from json ===>"+result.collect());
   	 
   	 JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
   	 
   	 delete_dir(dir);
   	 //mkdir_dir();
   	
   	 formatted.saveAsTextFile(dir);
  
  }
  
  // Tuple2형의 List를 key/value형의 JavaPairRDD로 변환하고 ConvertToWritableTypes 객체를 이용하여 IntWritable형으로 변환하여
  // SequenceFileOutputFormat으로 저장하는 예제
  static void test12(JavaSparkContext sc) {
	  String dir = "sequence-write";
	  
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  
	  data1.add(new Tuple2("a",2));
	  data1.add(new Tuple2("c",4));
	  data1.add(new Tuple2("c",6));
	  
	  JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(data1);
	  
	  JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
	  
	  delete_dir(dir);
	  
	  System.out.println("Native Values before ==>"+data1.toString());
	   	 
	  result.saveAsNewAPIHadoopFile(dir, Text.class, IntWritable.class,	SequenceFileOutputFormat.class);
	  System.out.println("Saved as SequenceFileOutputFormat.class");
  }
  
  // Writable형의 저장되어 있는 sequence format의 data를 읽어서 원래의 값으로 변환하는 예제
  static void test13(JavaSparkContext sc) {
	  String fileName = "sequence-write";
	  JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(fileName, Text.class, IntWritable.class);
	  JavaPairRDD<String, Integer> result = input.mapToPair(new ConvertToNativeTypes());
	  
	  System.out.println("Native Values after ====>"+result.collect());
  }
  
  
  // HIveContext를 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0기준으로 deprecated됨)
  static void test14(JavaSparkContext sc) {
	  HiveContext ctx = new HiveContext(sc);
	  
	  Dataset<Row>  rows = ctx.sql("select * from default.test_table");
  
	  Dataset<String> stringsDS = rows.map(new MapFunction<Row, String>() {
		  @Override
		  public String call(Row row) throws Exception {
			  //return "Key : "+row.get(0) + ", Value : "+row.get(1);
			  return "Value : "+row.get(0);
		  }
	  }, Encoders.STRING());
	  
	  System.out.println("select result #1 =>"+stringsDS.toJavaRDD().collect());
  }
  
  // SparkSession을 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0이상에서는 이것만 유효함)
  static void test15() {
	  SparkSession session = new SparkSession.Builder().appName("SparkJoinExample").master("local").enableHiveSupport().getOrCreate();
      Dataset<Row> dset = session.sql("select * from default.test_table");
      
	  System.out.println("select result #2 =>"+dset.toJavaRDD().collect());
  }

  // SQLContext를 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0기준으로 deprecated됨)
  static void test16(JavaSparkContext sc) {
	  SQLContext ctx = new SQLContext(sc);
	  Dataset<Row>  rows = ctx.sql("select * from default.test_table");
  
	  Dataset<String> stringsDS = rows.map(new MapFunction<Row, String>() {
		  @Override
		  public String call(Row row) throws Exception {
			  //return "Key : "+row.get(0) + ", Value : "+row.get(1);
			  return "Value : "+row.get(0);
		  }
	  }, Encoders.STRING());
	  
	  System.out.println("select result #3 =>"+stringsDS.toJavaRDD().collect());
  }
  
  // accumulator 변수를 사용하는 예제
  static void test17(JavaSparkContext sc) {
	  final Accumulator<Integer> blankLines = sc.accumulator(0);
	  
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
	  
	  JavaRDD<String> callSigns = lines.flatMap(
			  new FlatMapFunction<String, String>() {
				  public Iterator<String> call(String line) {
					  if(line.equals("")) {
						  blankLines.add(1);
					  }
					 return Arrays.asList(line.split(" ")).iterator();
				  }
				});
				
	  			delete_dir("output.txt");
	  
				callSigns.saveAsTextFile("output.txt");
				System.out.println("Blank lines: "+blankLines.value());
  }
  
  // accumulator와 broadcast변수를 동시에 사용하는 예제
  static void test18(JavaSparkContext sc) {

      final Accumulator<Integer> blankLines = sc.accumulator(0);
	  
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
	  
	  JavaRDD<String> callSigns = lines.flatMap(
			  new FlatMapFunction<String, String>() {
				  public Iterator<String> call(String line) {
					  if(line.equals("")) {
						  blankLines.add(1);
					  }
					  System.out.println("str in broadcasted  ==>"+temp.value());
					 return Arrays.asList(line.split(" ")).iterator();
				  }
				});
	  
	  			delete_dir("output.txt2");
				
				callSigns.saveAsTextFile("output.txt2");
				System.out.println("Blank lines: "+blankLines.value());
  }
  
  // 문자형 List를 JavaRDD로 변환하고 mapToDobule를 이용하여 double형으로 변환하고 집계함수인 sum, mean, variance, stdev등을 적용해보는 예제
  static void test19(JavaSparkContext sc) {
	  JavaRDD<String> age = sc.parallelize(Arrays.asList("1","2","3","4","5"));
	  JavaDoubleRDD doubleAge = age.mapToDouble(new DoubleFunction<String>() {
		  public double call(String value) {
			  return Double.parseDouble(value);
		  }});
	  
	  System.out.println("sum = "+doubleAge.sum());
	  System.out.println("mean ="+doubleAge.mean());
	  System.out.println("variance ="+doubleAge.variance());
	  System.out.println("stdev ="+doubleAge.stdev());
  }
 
  // Tuple2<Text, IntWritable>값을 받아서 String형 key와 Integer형 value로 변환하는 클래스
  static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
	  public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
		  return new Tuple2(record._1.toString(), record._2.get());
	  }
  }
  
  // String형의 key와 Integer형의 Tuple2를 받아서 Text형의 key와 IntWritable형의 Tuple2값으로 변환하는 클래스  
  static class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
	  public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
		  return new Tuple2(new Text(record._1), new IntWritable(record._2));
	  }
  }
  
  // HDFS상의 폴더밑 하위 파일을 지우는 함수
  static void delete_dir(String ff) {
	  Configuration conf = new Configuration();
	  conf.set("fs.default.name",  "hdfs://mycluster");
	  try {
		  FileSystem dfs = FileSystem.get(conf);
	  
		  System.out.println("Home Path : " + dfs.getHomeDirectory());
		  System.out.println("Work Path : " + dfs.getWorkingDirectory());
		  
		  Path dir = new Path (ff);
		  if(dfs.exists(dir)) {
			  dfs.delete(dir, true);
		  }
	  } catch (Exception e) {
		  System.out.println("delete dir error ==>"+e);
	  }
  }
  
  // HDFS상의 폴더를 생성하는 함수
  static void mkdir_dir() {
	  Configuration conf = new Configuration();
	  conf.set("fs.default.name", "hdfs://mycluster");
	  try {
		  FileSystem dfs = FileSystem.get(conf);
		  
		  Path dir = new Path ("formatted-out");
		  if( ! dfs.exists(dir)) {
			  dfs.mkdirs(dir);
		  }
	  } catch (Exception e) {
		  System.out.println("mk dir error ==>"+e);
	  }
  }
  
  
  
}


// json데이타를 Person1형의 iterator로 변환하는 클래스
class ParseJson implements FlatMapFunction<Iterator<String>, Person1> {
	public Iterator<Person1> call(Iterator<String> lines) throws Exception {
		ArrayList<Person1> people = new ArrayList<Person1>();
		
		Gson mapper = new Gson();
		while (lines.hasNext()) {
			String line = lines.next();
			try {
				System.out.println("line => "+line);
				Person1 person1 = mapper.fromJson(line, Person1.class);
				System.out.println("person1=>"+person1);
				people.add(person1);
			} catch (Exception e) {
				// 무시함
			}
		}
		return people.iterator();
	}
}

// Person1형의 데이타를 String형의 iterator로 변환하는 클래스
class WriteJson implements FlatMapFunction<Iterator<Person1>, String> {
	public Iterator<String> call(Iterator<Person1> people) throws Exception {
		ArrayList<String> text = new ArrayList<String>();
		while(people.hasNext()) {
			Person1 person = people.next();
			text.add("new string =>"+person.toString());
		}
		return text.iterator();
	}
}

class Person1 implements Serializable {
	String name;
	int age;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getAge() {
		return age;
	}
	public void setAge(int age) {
		this.age = age;
	}
	@Override
	public String toString() {
		return "Person1 [name=" + name + ", age=" + age + "]";
	}
	
}

// 평균값을 구하는 클래스 1
class AvgCount implements Serializable {
	private static final long serialVersionUID = 134444L;
	public AvgCount (int total, int num) {
		this.total = total;
		this.num = num;
	}
	
	public int total;
	public int num;
	public double avg() {
		return total / (double) num;
	}
}


//평균값을 구하는 클래스 2
class AvgCount2 implements Serializable {
	private static final long serialVersionUID = -1683922668212126392L;
	public AvgCount2(int total, int num) { total_= total; num_=num;}
	
	public int total_;
	public int num_;
	public float avg() {
		return total_ / (float) num_;
	}
}


번호 제목 날짜 조회 수
200 Ubuntu 16.04 LTS에 Hive 2.1.1설치하면서 "Version information not found in metastore"발생하는 오류원인및 조치사항 2017.05.03 657
199 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적(?)으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 217
198 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 227
197 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 156
196 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 2017.05.02 294
195 Cleaning up the staging area file시 'cannot access' 혹은 'Directory is not writable' 발생시 조치사항 2017.05.02 451
194 Ubuntu 16.04 LTS에 MariaDB 10.1설치 및 포트변경 및 원격접속 허용 2017.05.01 1678
193 Ubuntu 16.04 LTS에 4대에 Hadoop 2.8.0설치 2017.05.01 1130
192 Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스 2017.04.26 472
191 Spark에서 KafkaUtils.createStream()를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 예제 소스 2017.04.26 459
190 Hbase API를 이용하여 scan시 페이징을 고려하여 목록을 가져올때 사용할 수 있는 로직의 예시를 보여줌 2017.04.26 907
189 Spark에서 Serializable관련 오류및 조치사항 2017.04.21 5107
188 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging 발생시 조치사항 2017.04.19 850
187 Container killed by the ApplicationMaster. Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 TaskAttempt killed because it ran on unusable node 오류시 조치방법 2017.04.06 1006
186 streaming작업시 입력된 값에 대한 사본을 만들게 되는데 이것이 실패했을때 발생하는 경고메세지 2017.04.03 823
185 JavaStreamingContext를 이용하여 스트림으로 들어오는 문자열 카운트 소스 2017.03.30 287
184 kafka-manager 1.3.3.4 설정및 실행하기 2017.03.20 1771
» spark 2.0.0의 api를 이용하는 예제 프로그램 2017.03.15 349
182 It is indirectly referenced from required .class files 오류 발생시 조치방법 2017.03.09 856
181 spark2.0.0에서 hive 2.0.1 table을 읽어 출력하는 예제 소스(HiveContext, SparkSession, SQLContext) 2017.03.09 364
위로