0

I am building a spark application that will run 24 * 7 and we are using spark at runtime to infer schema of the incoming streams. First time the application will construct the schema and next time it will read the saved schema. Since our application is continuously running, I cannot update compare and update the schema for each stream. I want to handle scenarios in which a stream has a new field which is not present in the schema. Currently, the process is ignoring the new field and loading the fields as per schema into the target tables.

%python

from pyspark.sql.types import StructField,StructType,StringType
from pyspark.sql import functions as F
import pyspark.sql.dataframe
import pandas as pd


schema = StructType([ \
    StructField("lower",StringType(),True), \
    StructField("upper",StringType(),True), \
    StructField("literal",StringType(),True), \
    StructField("text",StringType(),True), 
  ])

inputKafkaData = spark.createDataFrame(data=[
  (topic, "A", """{"lower": "a", "upper": "A","literal":"a","text":"abc"}"""),
  (topic,"B", """{"PARTIAL_lower": "a", "upper": "A","literal":"b","text":"abc"}"""),
  (topic,"C" , """{"p": "a", "q": "A","literal":"a","text":"abc"}"""),
  (topic, "D", """{"lower": "a", "upper": "A","literal":"a","text":"123","text_3":"abc"}""")]
  ).toDF("topic", "key", "value")



dataset=inputKafkaData.selectExpr("value").withColumn("letter", F.from_json("value", schema))

dataset.select("letter.*").show()



 **I added extra field text_3 in the 4 record of the dataframe. But in target table it does not show up.
    
    dataset:pyspark.sql.dataframe.DataFrame
    value:string
    letter:struct
    lower:string
    upper:string
    literal:string
    text:string
    
    +-----+-----+-------+----+
    |lower|upper|literal|text|
    +-----+-----+-------+----+
    |    a|    A|      a| abc|
    | null|    A|      b| abc|
    | null| null|      a| abc|
    |    a|    A|      a| 123|
    +-----+-----+-------+----+
Jacek Laskowski
  • 68,975
  • 24
  • 224
  • 395
  • Sounds like schema registry could help you deal with not having to manually define the schemas in code. You'd have to create a kafka topic with the schema for each stream and update it as it changes. Here's related question with a python template at the end: https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry – matkurek Jun 21 '21 at 20:51

0 Answers0