메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


spark 테스트 프로그램 몇개

package com.gooper.test;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaSparkPi {
	
	static final SparkConf sparkConf=new SparkConf().setAppName("JavaSparkPi");
	static {
		sparkConf.setMaster("spark://gsda1:7077,gsda2:7077");
		sparkConf.setMaster("local[*]");

	}
    
	static final JavaSparkContext jsc=new JavaSparkContext(sparkConf);
	static  final Broadcast<List<String>> temp = jsc.broadcast(Arrays.asList("hello world", "", "hi park", "", "sss ff"));

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start................");

	// PI값 구하기
    int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
    int n = 100000 * slices;
    List<Integer> l = new ArrayList<Integer>(n);
    for (int i = 0; i < n; i++) {
      l.add(i);
    }

    JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
    
    int count = dataSet.map((z) ->  {
        double x = Math.random() * 2 - 1;
        double y = Math.random() * 2 - 1;
        if(x * x + y * y <= 1)  return 1; else return 0;
    }).reduce((s, s2) -> (s + s2));

    System.out.println("Pi is roughly " + 4.0 * count / n);

    
    System.out.println("=========== test start =================================");
    test(jsc);
    System.out.println("=========== test end =================================");
    
    System.out.println("=========== test2 start =================================");
    test2(jsc);
    System.out.println("=========== test2 end =================================");
    
    System.out.println("=========== test3 start =================================");
    test3(jsc);
    System.out.println("=========== test3 end =================================");

    System.out.println("=========== test4 start =================================");
    test4(jsc);
    System.out.println("=========== test4 end =================================");

    System.out.println("=========== test5 start =================================");
    test5(jsc);
    System.out.println("=========== test5 end =================================");

    System.out.println("=========== test6 start =================================");
    test6(jsc);
    System.out.println("=========== test6 end =================================");

    System.out.println("=========== test7 start =================================");
    test7(jsc);
    System.out.println("=========== test7 end =================================");

    System.out.println("=========== test8 start =================================");
    test8(jsc);
    System.out.println("=========== test8 end ================================="); 

    System.out.println("=========== test9 start =================================");
    test9(jsc);
    System.out.println("=========== test9 end =================================");

    System.out.println("=========== test10 start =================================");
    test10(jsc);
    System.out.println("=========== test10 end =================================");

    System.out.println("=========== test11 start =================================");
    test11(jsc);
    System.out.println("=========== test11 end =================================");

    System.out.println("=========== test12 start =================================");
    test12(jsc);
    System.out.println("=========== test12 end =================================");

    System.out.println("=========== test13 start =================================");
    test13(jsc);
    System.out.println("=========== test13 end =================================");

    System.out.println("=========== test14 start =================================");
    test14(jsc);
    System.out.println("=========== test14 end =================================");

    System.out.println("=========== test15 start =================================");
    test15();
    System.out.println("=========== test15 end =================================");

    System.out.println("=========== test16 start =================================");
    test16(jsc);
    System.out.println("=========== test16 end =================================");

    System.out.println("=========== test17 start =================================");
    test17(jsc);
    System.out.println("=========== test17 end =================================");

    System.out.println("=========== test18 start =================================");
    test18(jsc);
    System.out.println("=========== test18 end =================================");

    System.out.println("=========== test19 start =================================");
    test19(jsc);
    System.out.println("=========== test19 end =================================");
    
    jsc.stop();
    jsc.close();
    
    System.out.println("end................");
  }
  
  // List를 RDD로 변환하고 map연상을 통해서 x*x한 값으로 구성된 JavaRDD를 만들어서 화면에 출력 
  static void test (JavaSparkContext sc) {
	  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
	  JavaRDD<Integer> result = rdd.map((x) -> {return x*x;});
	  
	  System.out.println("result ==>"+StringUtils.join(result.collect(), ","));
  }
  
  // 문자열을 space로 분리(람다식을 이용함)하여 화면에 출력
  static void test2 (JavaSparkContext sc) {
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "hi"));
	  JavaRDD<String> words = lines.flatMap( (line) -> {return Arrays.asList(line.split(" ")).iterator();});
	  
	  System.out.println("result ==>"+StringUtils.join(words.collect(), ","));
  }
  
  
  // RDD의 함수인 disticnt, union, intersection, subtract, cartesian, countByValue를 적용하여 화면에 결과 출력 
  static void test3 (JavaSparkContext sc) {
	  JavaRDD<String> data1 = sc.parallelize(Arrays.asList("coffee", "coffee", "panda", "monkey", "tea"));
	  JavaRDD<String> data2 = sc.parallelize(Arrays.asList("coffee", "monkey", "kitty"));
	  JavaRDD<Integer> data3 = sc.parallelize(Arrays.asList(1,2,3));
	  
	  System.out.println("distinct ==>"+data1.distinct().collect());
	  System.out.println("union ==>"+data1.union(data2).collect());
	  System.out.println("intersection ==>"+data1.intersection(data2).collect());
	  System.out.println("subtract ==>"+data1.subtract(data2).collect());
	  System.out.println("cartesion ==>"+data1.cartesian(data3).collect());
	  System.out.println("countByValue ==>"+data1.countByValue());
  }
  
  
  // persist를 사용하여 reduce, fold등를 적용해보고, double형의 RDD로 변환하여 mean값을 구하여 출력함
  static void test4(JavaSparkContext sc) {
	  JavaRDD<Integer> data1 = sc.parallelize(Arrays.asList(1,2,3,4));
	  JavaRDD<Integer> data2 = sc.parallelize(Arrays.asList(3,4,5));
	  
	  List<Integer> data3 = new ArrayList<Integer>();
	  data3.add(1);
	  data3.add(2);
	  data3.add(3);
	  data3.add(4);
	  
	  JavaRDD<Integer> map = data1.map(x -> x+1);
	  map.persist(StorageLevel.MEMORY_AND_DISK());
	  
	  Function2<Integer, Integer, Integer> reduce = new Function2<Integer, Integer, Integer>() {
		 public Integer call(Integer x, Integer y) {
			 return x+y;
		 }
	  };
	  
	  DoubleFunction<Integer> df = new DoubleFunction<Integer>() {
		  public double call(Integer x) {
			  return (double) x;
		  }
	  };
	  
	  System.out.println("map==>"+data1.map(x -> x+1).reduce((x, y)->{return x+y;}));
	  
	  System.out.println("fold==>"+map.fold(0, reduce));
	  
	  map.foreach((x)->System.out.println(x));
	  
	  JavaDoubleRDD result = data1.mapToDouble((x) -> x);
	  System.out.println("mean ===>"+result.mean());
	  result.foreach((x) -> System.out.println(x));
	  
	  System.out.println("--------------------------");
	  
	  JavaDoubleRDD result2 = map.mapToDouble(df);
  	  System.out.println("mean by DoubleFuntion()===>"+result2.mean());
	  result2.foreach((x) -> System.out.println(x)); 

  }
  
  
  // 숫자형의 JavaRDD를 이용하여 aggregate함수를 사용하는 예제
  static void test5(JavaSparkContext sc) {
	  JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));
	  
	  Function2<AvgCount, Integer, AvgCount> addAndCount =
			  new Function2<AvgCount, Integer, AvgCount> (){
				private static final long serialVersionUID = 122222L;

				public AvgCount call(AvgCount a, Integer x) {
			  		a.total += x;
			  		a.num += 1;
			  		return a;
			  	}
			  };

	  Function2<AvgCount, AvgCount, AvgCount> combine =
	  new Function2<AvgCount, AvgCount, AvgCount> (){
		private static final long serialVersionUID = 11111L;

		public AvgCount call(AvgCount a, AvgCount b) {
	  		a.total += b.total;
	  		a.num += b.num;
	  		return a;
	  	}
	  };

	  AvgCount initial = new AvgCount(0, 0);
	  AvgCount result = rdd.aggregate(initial, addAndCount, combine);
      System.out.println(result.avg());
  }
  
  
  // 문자열 RDD를 key/value형태로 바꾸고 key를 기준을 작동하는 sortByKey, reduceByKey, groupByKey, sortByKey를 적용해보는 예제
  static void test6(JavaSparkContext sc) {
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "hi", "hi park", "dk"));
	  
	PairFunction<String, String, Integer> keyData = 
		new PairFunction<String, String, Integer>() {
		public Tuple2<String, Integer> call(String x) {
			return new Tuple2(x.split(" ")[0], x.length());
		}
	};
	
	JavaPairRDD<String, Integer> pairs = lines.mapToPair(keyData);
	pairs.foreach(x->System.out.println(x));
	
	JavaPairRDD<String, Integer> reduceByKey = pairs.reduceByKey(   (x, y) -> { return (x+y);} );
	JavaPairRDD<String, Iterable<Integer>> groupByKey = pairs.groupByKey();

	JavaPairRDD<String, Integer> sortByKey = pairs.sortByKey(false);
	
	System.out.println("reduceByKey =>"+reduceByKey.collect() );
	
	System.out.println("groupByKey =>"+groupByKey.collect() );
	
	System.out.println("sortBykey => "+sortByKey.collect() );
  }

  //   Tuple2형의 RDD를 이용하여 mapToPair를 적용하여 JavaPairRDD를 만들면서 각각의 RDD를 출력해보는 예제
  //  두개의 JavaPairRDD를 이용하여 subtractByKey, subtract, rightOuterJoin, leftOuterJoin, cogroup를 적용해보는 예제 
  static void test7(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
	  
	  data1.add(new Tuple2("a",2));
	  data1.add(new Tuple2("c",4));
	  data1.add(new Tuple2("c",6));
	  
	  data2.add(new Tuple2("c",4));
	  
	  JavaRDD<Tuple2<String, Integer>> pdataa1 = sc.parallelize(data1);
	  
	  JavaPairRDD<String, Integer> pdataa11 = pdataa1.mapToPair(  (x) -> {return new Tuple2(x._1, x._2);}  );
	  
	  System.out.println("pdataa1 ==>"+pdataa1);
	  System.out.println("pdataa11 ==>"+pdataa11);
	  
	  System.out.println("pdataa1 ==>"+pdataa1.collect());
	  System.out.println("pdataa11 ==>"+pdataa11.collect());
	  
	  JavaPairRDD<String, Integer> pdata1 = sc.parallelizePairs(data1);
	  JavaPairRDD<String, Integer> pdata2 = sc.parallelizePairs(data2);
	  
	  System.out.println("pdata1 ==>"+pdata1.collect());
	  System.out.println("pdata2 ==>"+pdata2.collect());

	System.out.println("subtractByKey =>"+pdataa11.subtractByKey(pdata2).collect());
	System.out.println("subtract =>"+pdata1.subtract(pdata2).collect());
	
	System.out.println("join =>"+pdata1.join(pdata2).collect());
	System.out.println("rightOuterJoin =>"+pdata1.rightOuterJoin(pdata2).collect());
	System.out.println("leftOuterJoin =>"+pdata1.leftOuterJoin(pdata2).collect());
	
	System.out.println("cogroup =>"+pdata1.cogroup(pdata2).collect());

	Function<Integer, Integer> ff = new Function<Integer, Integer>() {
		private static final long serialVersionUID = 11234L;
		int sum = 100;
		public Integer call (Integer x) {
			sum += x;
			return sum;
		}
	};
	
	System.out.println("mapValues =>" + pdata1.mapValues(ff).collect()); 
	
	//System.out.println("mapValues 1=>" + pdata1.reduce( (x2._2) -> {return x2._2;})           );
	System.out.println("reduce =>" + pdata1.reduce( (x2, y2) -> {return new Tuple2("sum of all elements", (x2._2+ y2._2) );})           );
  }
  
  // String형의 List를 JavaRDD를 만들고 이를 JavaPairRDD로 변환후 reduceByKey를 적용하는 예제
  static void test8(JavaSparkContext sc) {
	  List<String> data1 = new ArrayList<String>();
	  data1.add("ab");
	  data1.add("abcd");
	  data1.add("ab");
	  data1.add("cd");
	  
	  JavaRDD<String> pdata = sc.parallelize(data1);
	  
	  JavaPairRDD<String, Integer> pdata1 = pdata.mapToPair(x-> {return new Tuple2(x, 1);});
	  System.out.println("mapToPair==>"+pdata1.collect());
	  
	  JavaPairRDD<String, Integer> pdata2 = pdata1.reduceByKey( (x, y)-> { return (x+y); } );
	  System.out.println("reduceByKey==>"+pdata2.collect() );

  }
  
  //   sc.parallelizePairs를 이용하여  JavaPairRDD를 만들고 combineByKey 적용하고 Map으로 collect후에 출력하는 예제
  static void test9(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  data1.add(new Tuple2("a", 1));
	  data1.add(new Tuple2("b", 1));
	  data1.add(new Tuple2("a", 10));
	  data1.add(new Tuple2("c", 9));
	  
	  JavaPairRDD<String, Integer> pdata = sc.parallelizePairs(data1);
	  
	  Function<Integer, AvgCount2> createCombiner = new Function<Integer, AvgCount2>() {
			public AvgCount2 call(Integer x) {
				return new AvgCount2(x, 1);
			}
		};

		Function2<AvgCount2, Integer, AvgCount2> mergeValue =
			new Function2<AvgCount2, Integer, AvgCount2>() {
			public AvgCount2 call(AvgCount2 a, Integer x) {
				a.total_ += x;
				a.num_ += 1;
				return a;
			}
		};

		Function2<AvgCount2, AvgCount2, AvgCount2> mergeCombiner =
			new Function2<AvgCount2, AvgCount2, AvgCount2>() {
			public AvgCount2 call(AvgCount2 a, AvgCount2 b) {
				a.total_ += b.total_;
				a.num_ += b.num_;
				return a;
			}
		};
	  
	  JavaPairRDD<String, AvgCount2> avgCounts =  pdata.combineByKey(createCombiner, mergeValue, mergeCombiner);
	  Map<String, AvgCount2> countMap = avgCounts.collectAsMap();

	  for(Entry<String, AvgCount2> entry : countMap.entrySet()) {
	  	System.out.println(entry.getKey() + ":" + entry.getValue().avg());
	  }
  
  }
  
  // 두개의 Tuple2형 List를 이용하여 sc.paralledizePairs를 이용하여 JavaPairRDD로 변환하고 leftOuterJoin를 적용하는 예제
  static void test10(JavaSparkContext sc) {
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  data1.add(new Tuple2("x", 10));	  
	  data1.add(new Tuple2("a", 1));
	  data1.add(new Tuple2("b", 1));

	  
	  
	  List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
	  data2.add(new Tuple2("a", "aa"));
	  data2.add(new Tuple2("b", "bb"));
	  
	  JavaPairRDD<String, Integer> pdata1 = sc.parallelizePairs(data1);
	  JavaPairRDD<String, String> pdata2 = sc.parallelizePairs(data2);
	  
	  pdata1.sortByKey(true);
	  
	 JavaPairRDD<String, Tuple2<Integer,Optional<String>>> result = pdata1.leftOuterJoin(pdata2);

	 System.out.println("pdata1==>"+pdata1.collect());
	 System.out.println("pdata2==>"+pdata2.collect());
	 
	 System.out.println("result==>"+result.collect());
  
  }
  
  // test.json파일을 일거서 메모리에 적재하고 데이타를 파싱하여 Person1 객체에 담아 partition별로 map작업을 수행고 HDFS에 저장하는 예제  
  static void test11(JavaSparkContext sc) {
	  String dir = "formatted-out";
	  
   	 JavaRDD<String> input = sc.textFile("file:///tmp/test.json", 5).persist(StorageLevel.MEMORY_ONLY_2());
   	 
   	 JavaRDD<Person1> result = input.mapPartitions(new ParseJson());
   	 
   	 System.out.println("persons from json ===>"+result.collect());
   	 
   	 JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
   	 
   	 delete_dir(dir);
   	 //mkdir_dir();
   	
   	 formatted.saveAsTextFile(dir);
  
  }
  
  // Tuple2형의 List를 key/value형의 JavaPairRDD로 변환하고 ConvertToWritableTypes 객체를 이용하여 IntWritable형으로 변환하여
  // SequenceFileOutputFormat으로 저장하는 예제
  static void test12(JavaSparkContext sc) {
	  String dir = "sequence-write";
	  
	  List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
	  
	  data1.add(new Tuple2("a",2));
	  data1.add(new Tuple2("c",4));
	  data1.add(new Tuple2("c",6));
	  
	  JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(data1);
	  
	  JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
	  
	  delete_dir(dir);
	  
	  System.out.println("Native Values before ==>"+data1.toString());
	   	 
	  result.saveAsNewAPIHadoopFile(dir, Text.class, IntWritable.class,	SequenceFileOutputFormat.class);
	  System.out.println("Saved as SequenceFileOutputFormat.class");
  }
  
  // Writable형의 저장되어 있는 sequence format의 data를 읽어서 원래의 값으로 변환하는 예제
  static void test13(JavaSparkContext sc) {
	  String fileName = "sequence-write";
	  JavaPairRDD<Text, IntWritable> input = sc.sequenceFile(fileName, Text.class, IntWritable.class);
	  JavaPairRDD<String, Integer> result = input.mapToPair(new ConvertToNativeTypes());
	  
	  System.out.println("Native Values after ====>"+result.collect());
  }
  
  
  // HIveContext를 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0기준으로 deprecated됨)
  static void test14(JavaSparkContext sc) {
	  HiveContext ctx = new HiveContext(sc);
	  
	  Dataset<Row>  rows = ctx.sql("select * from default.test_table");
  
	  Dataset<String> stringsDS = rows.map(new MapFunction<Row, String>() {
		  @Override
		  public String call(Row row) throws Exception {
			  //return "Key : "+row.get(0) + ", Value : "+row.get(1);
			  return "Value : "+row.get(0);
		  }
	  }, Encoders.STRING());
	  
	  System.out.println("select result #1 =>"+stringsDS.toJavaRDD().collect());
  }
  
  // SparkSession을 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0이상에서는 이것만 유효함)
  static void test15() {
	  SparkSession session = new SparkSession.Builder().appName("SparkJoinExample").master("local").enableHiveSupport().getOrCreate();
      Dataset<Row> dset = session.sql("select * from default.test_table");
      
	  System.out.println("select result #2 =>"+dset.toJavaRDD().collect());
  }

  // SQLContext를 이용하여 hive테이블에 접근하여 데이타를 읽어 출력하는 예제(Spark 2.0기준으로 deprecated됨)
  static void test16(JavaSparkContext sc) {
	  SQLContext ctx = new SQLContext(sc);
	  Dataset<Row>  rows = ctx.sql("select * from default.test_table");
  
	  Dataset<String> stringsDS = rows.map(new MapFunction<Row, String>() {
		  @Override
		  public String call(Row row) throws Exception {
			  //return "Key : "+row.get(0) + ", Value : "+row.get(1);
			  return "Value : "+row.get(0);
		  }
	  }, Encoders.STRING());
	  
	  System.out.println("select result #3 =>"+stringsDS.toJavaRDD().collect());
  }
  
  // accumulator 변수를 사용하는 예제
  static void test17(JavaSparkContext sc) {
	  final Accumulator<Integer> blankLines = sc.accumulator(0);
	  
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
	  
	  JavaRDD<String> callSigns = lines.flatMap(
			  new FlatMapFunction<String, String>() {
				  public Iterator<String> call(String line) {
					  if(line.equals("")) {
						  blankLines.add(1);
					  }
					 return Arrays.asList(line.split(" ")).iterator();
				  }
				});
				
	  			delete_dir("output.txt");
	  
				callSigns.saveAsTextFile("output.txt");
				System.out.println("Blank lines: "+blankLines.value());
  }
  
  // accumulator와 broadcast변수를 동시에 사용하는 예제
  static void test18(JavaSparkContext sc) {

      final Accumulator<Integer> blankLines = sc.accumulator(0);
	  
	  JavaRDD<String> lines  = sc.parallelize(Arrays.asList("hello world", "", "hi park", "", "sss ff"));
	  
	  JavaRDD<String> callSigns = lines.flatMap(
			  new FlatMapFunction<String, String>() {
				  public Iterator<String> call(String line) {
					  if(line.equals("")) {
						  blankLines.add(1);
					  }
					  System.out.println("str in broadcasted  ==>"+temp.value());
					 return Arrays.asList(line.split(" ")).iterator();
				  }
				});
	  
	  			delete_dir("output.txt2");
				
				callSigns.saveAsTextFile("output.txt2");
				System.out.println("Blank lines: "+blankLines.value());
  }
  
  // 문자형 List를 JavaRDD로 변환하고 mapToDobule를 이용하여 double형으로 변환하고 집계함수인 sum, mean, variance, stdev등을 적용해보는 예제
  static void test19(JavaSparkContext sc) {
	  JavaRDD<String> age = sc.parallelize(Arrays.asList("1","2","3","4","5"));
	  JavaDoubleRDD doubleAge = age.mapToDouble(new DoubleFunction<String>() {
		  public double call(String value) {
			  return Double.parseDouble(value);
		  }});
	  
	  System.out.println("sum = "+doubleAge.sum());
	  System.out.println("mean ="+doubleAge.mean());
	  System.out.println("variance ="+doubleAge.variance());
	  System.out.println("stdev ="+doubleAge.stdev());
  }
 
  // Tuple2<Text, IntWritable>값을 받아서 String형 key와 Integer형 value로 변환하는 클래스
  static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
	  public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
		  return new Tuple2(record._1.toString(), record._2.get());
	  }
  }
  
  // String형의 key와 Integer형의 Tuple2를 받아서 Text형의 key와 IntWritable형의 Tuple2값으로 변환하는 클래스  
  static class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
	  public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
		  return new Tuple2(new Text(record._1), new IntWritable(record._2));
	  }
  }
  
  // HDFS상의 폴더밑 하위 파일을 지우는 함수
  static void delete_dir(String ff) {
	  Configuration conf = new Configuration();
	  conf.set("fs.default.name",  "hdfs://mycluster");
	  try {
		  FileSystem dfs = FileSystem.get(conf);
	  
		  System.out.println("Home Path : " + dfs.getHomeDirectory());
		  System.out.println("Work Path : " + dfs.getWorkingDirectory());
		  
		  Path dir = new Path (ff);
		  if(dfs.exists(dir)) {
			  dfs.delete(dir, true);
		  }
	  } catch (Exception e) {
		  System.out.println("delete dir error ==>"+e);
	  }
  }
  
  // HDFS상의 폴더를 생성하는 함수
  static void mkdir_dir() {
	  Configuration conf = new Configuration();
	  conf.set("fs.default.name", "hdfs://mycluster");
	  try {
		  FileSystem dfs = FileSystem.get(conf);
		  
		  Path dir = new Path ("formatted-out");
		  if( ! dfs.exists(dir)) {
			  dfs.mkdirs(dir);
		  }
	  } catch (Exception e) {
		  System.out.println("mk dir error ==>"+e);
	  }
  }
  
  
  
}


// json데이타를 Person1형의 iterator로 변환하는 클래스
class ParseJson implements FlatMapFunction<Iterator<String>, Person1> {
	public Iterator<Person1> call(Iterator<String> lines) throws Exception {
		ArrayList<Person1> people = new ArrayList<Person1>();
		
		Gson mapper = new Gson();
		while (lines.hasNext()) {
			String line = lines.next();
			try {
				System.out.println("line => "+line);
				Person1 person1 = mapper.fromJson(line, Person1.class);
				System.out.println("person1=>"+person1);
				people.add(person1);
			} catch (Exception e) {
				// 무시함
			}
		}
		return people.iterator();
	}
}

// Person1형의 데이타를 String형의 iterator로 변환하는 클래스
class WriteJson implements FlatMapFunction<Iterator<Person1>, String> {
	public Iterator<String> call(Iterator<Person1> people) throws Exception {
		ArrayList<String> text = new ArrayList<String>();
		while(people.hasNext()) {
			Person1 person = people.next();
			text.add("new string =>"+person.toString());
		}
		return text.iterator();
	}
}

class Person1 implements Serializable {
	String name;
	int age;
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public int getAge() {
		return age;
	}
	public void setAge(int age) {
		this.age = age;
	}
	@Override
	public String toString() {
		return "Person1 [name=" + name + ", age=" + age + "]";
	}
	
}

// 평균값을 구하는 클래스 1
class AvgCount implements Serializable {
	private static final long serialVersionUID = 134444L;
	public AvgCount (int total, int num) {
		this.total = total;
		this.num = num;
	}
	
	public int total;
	public int num;
	public double avg() {
		return total / (double) num;
	}
}


//평균값을 구하는 클래스 2
class AvgCount2 implements Serializable {
	private static final long serialVersionUID = -1683922668212126392L;
	public AvgCount2(int total, int num) { total_= total; num_=num;}
	
	public int total_;
	public int num_;
	public float avg() {
		return total_ / (float) num_;
	}
}


번호 제목 날짜 조회 수
384 Kafka의 API중 Consumer.createJavaConsumerConnector()를 이용하고 다수의 thread를 생성하여 Kafka broker의 topic에 접근하여 데이타를 가져오고 처리하는 예제 소스 2017.04.26 510
383 Spark에서 KafkaUtils.createStream()를 이용하여 이용하여 kafka topic에 접근하여 객채로 저장된 값을 가져오고 처리하는 예제 소스 2017.04.26 487
382 Hbase API를 이용하여 scan시 페이징을 고려하여 목록을 가져올때 사용할 수 있는 로직의 예시를 보여줌 2017.04.26 981
381 linux에서 특정 포트를 사용하는 프로세스 확인하기 2017.04.26 734
380 Spark에서 Serializable관련 오류및 조치사항 2017.04.21 5295
379 Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging 발생시 조치사항 2017.04.19 945
378 [Jsoup]특정페이지를 jsoup을 이용하여 파싱하는 샘플소스 2017.04.18 947
377 [jsoup]Jsoup Tutorial 2017.04.11 411
376 update를 많이 하면 heap memory가 많이 소진되고 최종적으로 OOM가 발생하는데 이에 대한 설명 2017.04.10 993
375 LUBM 개수별 hadoop HDFS data사이즈 정리 2017.04.06 292
374 Container killed by the ApplicationMaster. Container killed on request. Exit code is 143 Container exited with a non-zero exit code 143 TaskAttempt killed because it ran on unusable node 오류시 조치방법 2017.04.06 1084
373 protege 설명및 사용법 file 2017.04.04 3455
372 streaming작업시 입력된 값에 대한 사본을 만들게 되는데 이것이 실패했을때 발생하는 경고메세지 2017.04.03 853
371 [메모리 덤프파일 분석] 2017.03.31 308
370 JavaStreamingContext를 이용하여 스트림으로 들어오는 문자열 카운트 소스 2017.03.30 293
369 nc -l 7777 : 7777포트에서 입력을 받는다. 2017.03.23 651
368 kafka-manager 1.3.3.4 설정및 실행하기 2017.03.20 2366
» spark 2.0.0의 api를 이용하는 예제 프로그램 2017.03.15 384
366 It is indirectly referenced from required .class files 오류 발생시 조치방법 2017.03.09 974
365 spark2.0.0에서 hive 2.0.1 table을 읽어 출력하는 예제 소스(HiveContext, SparkSession, SQLContext) 2017.03.09 391
위로