메뉴 건너뛰기

Bigdata, Semantic IoT, Hadoop, NoSQL

Bigdata, Hadoop ecosystem, Semantic IoT등의 프로젝트를 진행중에 습득한 내용을 정리하는 곳입니다.
필요한 분을 위해서 공개하고 있습니다. 문의사항은 gooper@gooper.com로 메일을 보내주세요.


console창을 두개 띄우고 한쪽에는 아래의 소스를 실행

(예, $HOME/spark/bin/spark-submit
--master spark://sda1:7077,sda2:7077
--driver-memory 2g
--executor-memory 3g
--class com.gooper.icbms.sda.kafka.onem2m.JavaSparkTest
sda-client-2.0.jar)


시키고 다른 쪽에는 nc -l 7777을 실행하고 문자열을 입력하여 7777포트에 stream을 발생시켜준다.


import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.DoubleFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import com.google.gson.Gson;

public final class JavaStreamingContextTest {

  public static void main(String[] args) throws Exception {
	  
	System.out.println("start(JavaStreamingContextTest)................");

    System.out.println("=========== test21 start =================================");
    test21();
    System.out.println("=========== test21 end =================================");
    

    
    System.out.println("end(JavaStreamingContextTest)................");
  }

  
// localhost:7777에서 들어오는 stream data에서 입력된 문자열을 기준으로 동일 문자열의 개수를 카운트한다.
static void test21()  {
	  SparkConf sc=new SparkConf().setAppName("JavaStreamingContextTest");
	  JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(1));
	  jssc.checkpoint("/tmp");
	  JavaDStream<String> lines = jssc.socketTextStream("sda1", 7777);
	  	  
	  // error가 있으면 출력
	  JavaDStream<String> errorLines  = lines.filter(new Function<String, Boolean>() {
		  public Boolean call(String line) {
			  return line.contains("error");
		  }
	  });
	  errorLines.print();
	  
	  // 문자카운트
	  JavaPairDStream<String, Integer> rst = lines.mapToPair(
			  new PairFunction<String, String, Integer>() {
				  public Tuple2<String, Integer> call(String line) {
					  return new Tuple2(line, 1);
				  }
			  }).updateStateByKey(new UpdateRunningSum());

	  // 람다식으로 처리할 경우
	  //JavaPairDStream<String, Integer> rst = lines.mapToPair( (line)->new Tuple2<String, Integer>(line, 1)).updateStateByKey(new UpdateRunningSum());

	  rst.print();
	  
	  jssc.start();
	  try { 
		  jssc.awaitTermination();
	  } catch (Exception e) {
		  System.out.println("exception 2: "+e.getMessage());
	  }
}

}
	
class UpdateRunningSum implements Function2<List<Integer>, Optional<Integer>, Optional<Integer>> {
	public Optional<Integer> call(List<Integer> values, Optional<Integer> current) {
		int newSum = current.orElse(0);
		for(int value : values) {
			newSum += value;
		}
		return Optional.of(newSum);
	}
};


번호 제목 글쓴이 날짜 조회 수
261 tablet별 disk사용량 확인하는 방법 총관리자 2021.08.27 133
» JavaStreamingContext를 이용하여 스트림으로 들어오는 문자열 카운트 소스 총관리자 2017.03.30 132
259 streaming작업시 입력된 값에 대한 사본을 만들게 되는데 이것이 실패했을때 발생하는 경고메세지 총관리자 2017.04.03 130
258 producer / consumer구현시 설정 옵션 설명 총관리자 2016.10.19 130
257 Ubuntu에서 sbt및 scala설치하기 총관리자 2017.06.20 129
256 ./hadoop-daemon.sh start namenode로 namenode기동시 EditLog의 custerId, namespaceId가 달라서 발생하는 오류 해결방법 총관리자 2016.09.24 127
255 start-all.sh로 spark데몬 기동시 "JAVA_HOME is not set"오류 발생시 조치사항 총관리자 2016.08.01 127
254 select와 group by결과 값이 없는경우의 리턴 값이 다름 file 총관리자 2016.02.05 127
253 protege 4.3 다운로드 총관리자 2015.12.09 126
252 console명령과 API비교 총관리자 2015.12.21 125
251 VPS에서는 root로 실행해도 swap파일을 만들지 못하게 만들어 두었지만 swap파일을 생성하는 방법 총관리자 2017.06.20 124
250 solrcloud에 solrdf1.1설치하고 테스트 하기 총관리자 2016.04.22 124
249 kudu 테이블 metadata강제 삭제시 발생하는 오류 메세지 총관리자 2022.01.12 123
248 하둡 클러스터 전체 노드를 다시 기동하면 invalidate metadata를 수행해야 데이터가 틀어지지 않는다. 총관리자 2019.05.20 122
247 Impala daemon기동시 "Could not create temporary timezone file"오류 발생시 조치사항 총관리자 2018.03.29 121
246 hadoop에서 yarn jar ..를 이용하여 appliction을 실행하여 정상적으로 수행되었으나 yarn UI의 어플리케이션 목록에 나타나지 않는 문제 총관리자 2017.05.02 121
245 lagom에서 제공하는 초기 생성기능을 이용하여 생성한 프로젝트의 소스 파악 총관리자 2018.01.16 119
244 Collections.sort를 이용한 List<User>형태의 데이타 정렬(숫자, 문자에 대해서 각각 asc/desc및 복합정렬) 총관리자 2016.12.15 119
243 hive의 메타정보 테이블을 MariaDB로 사용하는 경우 table comment나 column comment에 한글 입력시 깨지는 경우 utf8로 바꾸는 방법. gooper 2023.03.10 118
242 impala external 테이블 생성시 컬럼과 라인 구분자를 지정하여 테이블 생성하는 예시 총관리자 2020.02.20 118

A personal place to organize information learned during the development of such Hadoop, Hive, Hbase, Semantic IoT, etc.
We are open to the required minutes. Please send inquiries to gooper@gooper.com.

위로