0

In my Spark streaming job, I am trying to read Confluentavro message from Kafka topic and getting "Malformed records are detected in record parsing."

I tried to debug a lot but not able to figure out the record which malformed. Need help in understanding how can I get the record from a row which is Malformed. Is there a way I can print the avro message to see what is wrong with the message.

My Code:

object AvroReadMessage extends App {
val spark = SparkSession.builder.master("local[*]").appName("AvroReadMessage")
    .getOrCreate()
  spark.sparkContext.setLogLevel("WARN")

 val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("/read_message.avsc")))
 val readKafkaDF = spark.readStream.format("kafka")
   .option("kafka.bootstrap.servers", "localhost:9092")
   .option("subscribe", "topic2")
   .option("startingOffsets", "latest")
   .load()
val jmap = new java.util.HashMap[String, String]()
 jmap.put("mode", "PERMISSIVE")

  val query = readKafkaDF
    .select(from_avro('value, jsonFormatSchema, jmap) as 'value)
    .select("value.*")
    .writeStream.outputMode("append").format("console").start()

query.awaitTermination()
}

Any help would be highly appreciated.

OneCricketeer
  • 151,199
  • 17
  • 111
  • 216
JDev
  • 1,300
  • 1
  • 18
  • 42

1 Answers1

0

spark-avro is unable to read Confluent Schema Registry formatted data.

Please refer to Integrating Spark Structured Streaming with the Confluent Schema Registry

OneCricketeer
  • 151,199
  • 17
  • 111
  • 216