메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


console창을 두개 띄우고 한쪽에는 아래의 소스를 실행

(예, $HOME/spark/bin/spark-submit
--master spark://sda1:7077,sda2:7077
--driver-memory 2g
--executor-memory 3g
--class com.gooper.icbms.sda.kafka.onem2m.JavaSparkTest
sda-client-2.0.jar)


시키고 다른 쪽에는 nc -l 7777을 실행하고 문자열을 입력하여 7777포트에 stream을 발생시켜준다.


import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaStreamingContextTest {

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start(JavaStreamingContextTest)................");

    System.out.println("=========== test21 start =================================");
    test21();
    System.out.println("=========== test21 end =================================");
    

    
    System.out.println("end(JavaStreamingContextTest)................");
  }

  
// localhost:7777에서 들어오는 stream data에서 입력된 문자열을 기준으로 동일 문자열의 개수를 카운트한다.
static void test21()  {
	  SparkConf sc=new SparkConf().setAppName("JavaStreamingContextTest");
	  JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(1));
	  jssc.checkpoint("/tmp");
	  JavaDStream<String> lines = jssc.socketTextStream("sda1", 7777);
	  	  
	  // error가 있으면 출력
	  JavaDStream<String> errorLines  = lines.filter(new Function<String, Boolean>() {
		  public Boolean call(String line) {
			  return line.contains("error");
		  }
	  });
	  errorLines.print();
	  
	  // 문자카운트
	  JavaPairDStream<String, Integer> rst = lines.mapToPair(
			  new PairFunction<String, String, Integer>() {
				  public Tuple2<String, Integer> call(String line) {
					  return new Tuple2(line, 1);
				  }
			  }).updateStateByKey(new UpdateRunningSum());

	  // 람다식으로 처리할 경우
	  //JavaPairDStream<String, Integer> rst = lines.mapToPair( (line)->new Tuple2<String, Integer>(line, 1)).updateStateByKey(new UpdateRunningSum());

	  rst.print();
	  
	  jssc.start();
	  try { 
		  jssc.awaitTermination();
	  } catch (Exception e) {
		  System.out.println("exception 2: "+e.getMessage());
	  }
}

}
	
class UpdateRunningSum implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>> {
	public Optional<Integer> call(List<Integer> values, Optional<Integer> current) {
		int newSum = current.orElse(0);
		for(int value : values) {
			newSum += value;
		}
		return Optional.of(newSum);
	}
};


번호 제목 글쓴이 날짜 조회 수
281 LUBM 개수별 hadoop HDFS data사이즈 정리 총관리자 2017.04.06 146
280 ?a는 모두 표시하면서 ?b와 비교하여 ?a=?b는 ""로 하고 ?a!=?b 인경우는 해당값을 가지는 결과 집합을 구하는 경우 file 총관리자 2016.01.29 146
279 LUBM 데이타 생성구문 총관리자 2017.07.24 145
278 서버중 slave,worker,regionserver만 재기동해야 할때 필요한 기동스크립트및 사용방법 총관리자 2017.02.03 145
277 mongodb에서 큰데이타 sort시 오류발생에 대한 해결방법 총관리자 2015.12.22 145
276 [oozie] oozie shell action에서 shellscript수행결과의 2개 변수를 decision 액션에서 사용하기 총관리자 2020.06.05 143
275 --master yarn 옵션으로 spark client프로그램 실행할때 메모리 부족 오류발생시 조치방법 총관리자 2016.05.27 143
274 SQL문장과 Mongo에서 사용하는 명령어를 비교한 것입니다. 총관리자 2015.09.30 143
273 scala application 샘플소스(SparkSession이용) 총관리자 2018.03.07 140
272 Spark Streaming으로 유실 없는 스트림 처리 인프라 구축하기 총관리자 2016.03.11 140
271 바나나 파이의 /tmp폴더를 외장하드로 변경하기 총관리자 2015.07.24 139
270 [TLS]pkcs12형식의 인증서 생성및 jks형식 인증서 생성 커맨드 예시 총관리자 2022.03.15 137
269 jena jar파일실행시 org.apache.jena.tdb.TDB.init에서 java.lang.NullPointerException발생시 조치사항 총관리자 2016.08.19 137
268 한번에 여러값 update하기 총관리자 2016.01.13 137
267 halyard 1.3의 console을 이용하여 100억건의 데이타에 대한 쿼리수행시 ScannerTimeoutException 발생시 조치사항 총관리자 2017.09.06 136
266 like검색한 결과를 기준으로 집계를 수행하는 java 소스 총관리자 2016.12.19 133
265 ./spark-sql 실행시 "java.lang.NumberFormatException: For input string: "1s"오류발생시 조치사항 총관리자 2016.06.09 133
» JavaStreamingContext를 이용하여 스트림으로 들어오는 문자열 카운트 소스 총관리자 2017.03.30 132
263 [Kudu]Schema별 혹은 테이블별 사용량(Replica포함) 구하는 방법 gooper 2022.07.14 131
262 여러가지 방법으로 특정 jar파일을 exclude하지 못하는 경우 해당 jar파일을 제외시키는 방법 총관리자 2016.08.11 131

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로