Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
console창을 두개 띄우고 한쪽에는 아래의 소스를 실행
(예, $HOME/spark/bin/spark-submit
--master spark://sda1:7077,sda2:7077
--driver-memory 2g
--executor-memory 3g
--class com.gooper.icbms.sda.kafka.onem2m.JavaSparkTest
sda-client-2.0.jar)
시키고 다른 쪽에는 nc -l 7777을 실행하고 문자열을 입력하여 7777포트에 stream을 발생시켜준다.
import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.DoubleFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import com.google.gson.Gson; public final class JavaStreamingContextTest { public static void main(String[] args) throws Exception { System.out.println("start(JavaStreamingContextTest)................"); System.out.println("=========== test21 start ================================="); test21(); System.out.println("=========== test21 end ================================="); System.out.println("end(JavaStreamingContextTest)................"); } // localhost:7777에서 들어오는 stream data에서 입력된 문자열을 기준으로 동일 문자열의 개수를 카운트한다. static void test21() { SparkConf sc=new SparkConf().setAppName("JavaStreamingContextTest"); JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(1)); jssc.checkpoint("/tmp"); JavaDStream<String> lines = jssc.socketTextStream("sda1", 7777); // error가 있으면 출력 JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() { public Boolean call(String line) { return line.contains("error"); } }); errorLines.print(); // 문자카운트 JavaPairDStream<String, Integer> rst = lines.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String line) { return new Tuple2(line, 1); } }).updateStateByKey(new UpdateRunningSum()); // 람다식으로 처리할 경우 //JavaPairDStream<String, Integer> rst = lines.mapToPair( (line)->new Tuple2<String, Integer>(line, 1)).updateStateByKey(new UpdateRunningSum()); rst.print(); jssc.start(); try { jssc.awaitTermination(); } catch (Exception e) { System.out.println("exception 2: "+e.getMessage()); } } } class UpdateRunningSum implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>> { public Optional<Integer> call(List<Integer> values, Optional<Integer> current) { int newSum = current.orElse(0); for(int value : values) { newSum += value; } return Optional.of(newSum); } };