Cloudera CDH/CDP 및 Hadoop EcoSystem, Semantic IoT등의 개발/운영 기술을 정리합니다. gooper@gooper.com로 문의 주세요.
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
object StreamingLogsMB {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: stubs.StreamingLogsMB <hostname> <port>")
System.exit(1)
}
// get hostname and port of data source from application arguments
val hostname = args(0)
val port = args(1).toInt
// Create a Spark Context
val sc = new SparkContext()
// Set log level to ERROR to avoid distracting extra output
sc.setLogLevel("ERROR")
// Configure the Streaming Context with a 1 second batch duration
val ssc = new StreamingContext(sc,Seconds(1))
// Create a DStream of log data from the server and port specified
val logs = ssc.socketTextStream(hostname,port)
ssc.checkpoint("logcheckpt")
logs.countByWindow(Seconds(5), Seconds(2)).print
ssc.start()
ssc.awaitTermination()
}
}