1
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));

My HDFS directory contains json files

Krishna
  • 1,027
  • 4
  • 21
  • 35

1 Answers1

2

You can use textFileStream to read it as a text file and convert it later.

val dstream = ssc.textFileStream("path to hdfs directory")

This gives you DStream[Strings] which is a collection of RDD[String]

Then you can get an RDD for each interval of time as

dstream.foreachRDD(rdd => {
  //now apply a transformation or anything with the each rdd
 spark.read.json(rdd) // to change it to dataframe
})

scc.start()             // Start the computation
ssc.awaitTermination()   // Wait for the computation to terminate

Hope this helps

koiralo
  • 21,620
  • 4
  • 46
  • 70