Spark Streaming (Scala)
Steps :1. Open Terminal
2. nc -l 54321 #any port number given in the program
3. Run below code
4. Type anything in terminal
5. Notice the Code Output
build.sbt
name := "MachineLearning"
version := "0.1"
scalaVersion := "2.11.0"
val sparkVersion = "2.4.7"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming-flume" % "2.4.7"
)
libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.24"
version := "0.1"
scalaVersion := "2.11.0"
val sparkVersion = "2.4.7"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming-flume" % "2.4.7"
)
libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.24"
StreamingNetworkWordCount.scala
package SparkStreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingNetworkWordCount {
def main(args:Array[String]) {
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines = ssc.socketTextStream("localhost",54321)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingNetworkWordCount {
def main(args:Array[String]) {
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines = ssc.socketTextStream("localhost",54321)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}
No comments:
Post a Comment