Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
console창을 두개 띄우고 한쪽에는 아래의 소스를 실행
(예, $HOME/spark/bin/spark-submit
--master spark://sda1:7077,sda2:7077
--driver-memory 2g
--executor-memory 3g
--class com.gooper.icbms.sda.kafka.onem2m.JavaSparkTest
sda-client-2.0.jar)
시키고 다른 쪽에는 nc -l 7777을 실행하고 문자열을 입력하여 7777포트에 stream을 발생시켜준다.
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import com.google.gson.Gson;
public final class JavaStreamingContextTest {
public static void main(String[] args) throws Exception {
System.out.println("start(JavaStreamingContextTest)................");
System.out.println("=========== test21 start =================================");
test21();
System.out.println("=========== test21 end =================================");
System.out.println("end(JavaStreamingContextTest)................");
}
// localhost:7777에서 들어오는 stream data에서 입력된 문자열을 기준으로 동일 문자열의 개수를 카운트한다.
static void test21() {
SparkConf sc=new SparkConf().setAppName("JavaStreamingContextTest");
JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(1));
jssc.checkpoint("/tmp");
JavaDStream<String> lines = jssc.socketTextStream("sda1", 7777);
// error가 있으면 출력
JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("error");
}
});
errorLines.print();
// 문자카운트
JavaPairDStream<String, Integer> rst = lines.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String line) {
return new Tuple2(line, 1);
}
}).updateStateByKey(new UpdateRunningSum());
// 람다식으로 처리할 경우
//JavaPairDStream<String, Integer> rst = lines.mapToPair( (line)->new Tuple2<String, Integer>(line, 1)).updateStateByKey(new UpdateRunningSum());
rst.print();
jssc.start();
try {
jssc.awaitTermination();
} catch (Exception e) {
System.out.println("exception 2: "+e.getMessage());
}
}
}
class UpdateRunningSum implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>> {
public Optional<Integer> call(List<Integer> values, Optional<Integer> current) {
int newSum = current.orElse(0);
for(int value : values) {
newSum += value;
}
return Optional.of(newSum);
}
};