메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


console창을 두개 띄우고 한쪽에는 아래의 소스를 실행

(예, $HOME/spark/bin/spark-submit
--master spark://sda1:7077,sda2:7077
--driver-memory 2g
--executor-memory 3g
--class com.gooper.icbms.sda.kafka.onem2m.JavaSparkTest
sda-client-2.0.jar)


시키고 다른 쪽에는 nc -l 7777을 실행하고 문자열을 입력하여 7777포트에 stream을 발생시켜준다.


import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaStreamingContextTest {

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start(JavaStreamingContextTest)................");

    System.out.println("=========== test21 start =================================");
    test21();
    System.out.println("=========== test21 end =================================");
    

    
    System.out.println("end(JavaStreamingContextTest)................");
  }

  
// localhost:7777에서 들어오는 stream data에서 입력된 문자열을 기준으로 동일 문자열의 개수를 카운트한다.
static void test21()  {
	  SparkConf sc=new SparkConf().setAppName("JavaStreamingContextTest");
	  JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(1));
	  jssc.checkpoint("/tmp");
	  JavaDStream<String> lines = jssc.socketTextStream("sda1", 7777);
	  	  
	  // error가 있으면 출력
	  JavaDStream<String> errorLines  = lines.filter(new Function<String, Boolean>() {
		  public Boolean call(String line) {
			  return line.contains("error");
		  }
	  });
	  errorLines.print();
	  
	  // 문자카운트
	  JavaPairDStream<String, Integer> rst = lines.mapToPair(
			  new PairFunction<String, String, Integer>() {
				  public Tuple2<String, Integer> call(String line) {
					  return new Tuple2(line, 1);
				  }
			  }).updateStateByKey(new UpdateRunningSum());

	  // 람다식으로 처리할 경우
	  //JavaPairDStream<String, Integer> rst = lines.mapToPair( (line)->new Tuple2<String, Integer>(line, 1)).updateStateByKey(new UpdateRunningSum());

	  rst.print();
	  
	  jssc.start();
	  try { 
		  jssc.awaitTermination();
	  } catch (Exception e) {
		  System.out.println("exception 2: "+e.getMessage());
	  }
}

}
	
class UpdateRunningSum implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>> {
	public Optional<Integer> call(List<Integer> values, Optional<Integer> current) {
		int newSum = current.orElse(0);
		for(int value : values) {
			newSum += value;
		}
		return Optional.of(newSum);
	}
};


번호 제목 날짜 조회 수
621 select와 group by결과 값이 없는경우의 리턴 값이 다름 file 2016.02.05 210
620 service name방식의 oracle을 메타정보 저장소로 사용할때 Hue Configuration설정하는 방법 2022.02.12 210
» JavaStreamingContext를 이용하여 스트림으로 들어오는 문자열 카운트 소스 2017.03.30 211
618 [TLS]pkcs12형식의 인증서 생성및 jks형식 인증서 생성 커맨드 예시 2022.03.15 211
617 S2RDF모듈의 실행부분만 추출하여 별도록 실행하는 방법(draft) 2016.06.14 213
616 LUBM 개수별 hadoop HDFS data사이즈 정리 2017.04.06 214
615 kudu rebalance수행 command예시 2022.01.17 218
614 --master yarn 옵션으로 spark client프로그램 실행할때 메모리 부족 오류발생시 조치방법 2016.05.27 219
613 여러가지 방법으로 특정 jar파일을 exclude하지 못하는 경우 해당 jar파일을 제외시키는 방법 2016.08.11 220
612 ./hadoop-daemon.sh start namenode로 namenode기동시 EditLog의 custerId, namespaceId가 달라서 발생하는 오류 해결방법 2016.09.24 220
611 How-to: Tune Your Apache Spark Jobs (Part 2) file 2016.10.31 221
610 LUBM 데이타 생성구문 2017.07.24 221
609 failed to read local state, exiting...오류발생시 조치사항 2016.04.06 222
608 Toree 0.1.0-incubating이 Scala 2.10.4까지만 지원하게 되어서 발생하는 NoSuchMethod오류 문제 해결방법(scala 2.11.x을 지원하지만 오류가 발생할 수 있음) 2018.04.20 223
607 대표 오픈소스 라이선스, 한 눈에 보기! 2015.12.10 226
606 ./spark-sql 실행시 "java.lang.NumberFormatException: For input string: "1s"오류발생시 조치사항 2016.06.09 226
605 protege 4.3 다운로드 2015.12.09 227
604 [Kudu]Schema별 혹은 테이블별 사용량(Replica포함) 구하는 방법 2022.07.14 228
603 [CDP7.1.7]impala-shell수행시 간헐적으로 "-k requires a valid kerberos ticket but no valid kerberos ticket found." 오류 2023.11.16 230
602 컴퓨터 무한 재부팅 원인및 조치방법 file 2017.12.05 232
위로