0

Following is how my code sequenced,

//Accumulator initialized 
val count = new LongAccumulator
sparksession.sparkContext.register(count,"count accumulator")

// Streaming Transformation
val DF = fromKafkaDF.map{
  count.add(1)
  println(count.value)  // This value is one
  //some transformation
}.writeStream.outputMode("update").format("console").start()

//trying to access the value of accumulator from driver
println(count.value)  //this value is zero

Why the value of accumulator is zero in driver? I have other logic to work on based on this accumulator. Please suggest.

Shaido
  • 25,575
  • 21
  • 68
  • 72
prady
  • 483
  • 2
  • 8
  • 20

2 Answers2

0

For you to accumulate any value in counter, you should perform an action and check, accumulator wont be in use without any action, please check the link accumulator explained

0

use it in listeners like,

class TestListner(acc: LongAccumulator) extends StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    println("onQueryStarted   :" + event.toString)
  }

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    println(acc)
    acc.reset()
    println("onQueryProgress    :" + event.progress)
  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    println("onQueryProgress    :" + event)
  }
}

and addListener in your main application,

    spark.streams.addListener(new TestListner(acc))
Alen Peter
  • 116
  • 4
  • 10