메뉴 건너뛰기

Cloudera, BigData, Semantic IoT, Hadoop, NoSQL

Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.


import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds

object StreamingLogsMB {
  def main(args: Array[String]) {
    if (args.length < 2) {

      System.err.println("Usage: stubs.StreamingLogsMB <hostname> <port>")
      System.exit(1)
    }

    // get hostname and port of data source from application arguments
    val hostname = args(0)
    val port = args(1).toInt

    // Create a Spark Context
    val sc = new SparkContext()

    // Set log level to ERROR to avoid distracting extra output
    sc.setLogLevel("ERROR")

    // Configure the Streaming Context with a 1 second batch duration
    val ssc = new StreamingContext(sc,Seconds(1))

    // Create a DStream of log data from the server and port specified
    val logs = ssc.socketTextStream(hostname,port)

    ssc.checkpoint("logcheckpt")

    logs.countByWindow(Seconds(5), Seconds(2)).print

    ssc.start()
    ssc.awaitTermination()
  }
}

위로