I am building a spark application that will run 24 * 7 and we are using spark at runtime to infer schema of the incoming streams. First time the application will construct the schema and next time it will read the saved schema. Since our application is continuously running, I cannot update compare and update the schema for each stream. I want to handle scenarios in which a stream has a new field which is not present in the schema. Currently, the process is ignoring the new field and loading the fields as per schema into the target tables.
%python
from pyspark.sql.types import StructField,StructType,StringType
from pyspark.sql import functions as F
import pyspark.sql.dataframe
import pandas as pd
schema = StructType([ \
StructField("lower",StringType(),True), \
StructField("upper",StringType(),True), \
StructField("literal",StringType(),True), \
StructField("text",StringType(),True),
])
inputKafkaData = spark.createDataFrame(data=[
(topic, "A", """{"lower": "a", "upper": "A","literal":"a","text":"abc"}"""),
(topic,"B", """{"PARTIAL_lower": "a", "upper": "A","literal":"b","text":"abc"}"""),
(topic,"C" , """{"p": "a", "q": "A","literal":"a","text":"abc"}"""),
(topic, "D", """{"lower": "a", "upper": "A","literal":"a","text":"123","text_3":"abc"}""")]
).toDF("topic", "key", "value")
dataset=inputKafkaData.selectExpr("value").withColumn("letter", F.from_json("value", schema))
dataset.select("letter.*").show()
**I added extra field text_3 in the 4 record of the dataframe. But in target table it does not show up.
dataset:pyspark.sql.dataframe.DataFrame
value:string
letter:struct
lower:string
upper:string
literal:string
text:string
+-----+-----+-------+----+
|lower|upper|literal|text|
+-----+-----+-------+----+
| a| A| a| abc|
| null| A| b| abc|
| null| null| a| abc|
| a| A| a| 123|
+-----+-----+-------+----+