0

I'm trying to ingest some mongo collections to big query using pyspark. The schema looks like this.

root
 |-- groups: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- my_field: struct (nullable = true)
 |    |    |    |-- **{ mongo id }**: struct (nullable = true)
 |    |    |    |    |-- A: timestamp (nullable = true)
 |    |    |    |    |-- B: string (nullable = true)
 |    |    |    |    |-- C: struct (nullable = true)
 |    |    |    |    |    |-- abc: boolean (nullable = true)
 |    |    |    |    |    |-- def: boolean (nullable = true)
 |    |    |    |    |    |-- ghi: boolean (nullable = true)
 |    |    |    |    |    |-- xyz: boolean (nullable = true)

The issue is that inside my_field we store the id, each group has it's own id and when I import everything to big query I end up having a new column for each id. I want to convert my_field to a string and store all the nested fields as a json or something like that. But when I try to convert it I'm getting this error

temp_df = temp_df.withColumn("groups.my_field", col("groups.my_field").cast('string'))

TypeError: Column is not iterable

What am I missing?

Alex Fragotsis
  • 1,236
  • 17
  • 34

1 Answers1

1

So it turns out that in order to append/remove/rename a nested field you need to change the schema. I didn't know that. So here's my answer. I copied and modified the code from here https://stackoverflow.com/a/48906217/984114 in order to make it work with my schema

here's the modified version of "exclude_nested_field"

def change_nested_field_type(schema, fields_to_change, parent=""):
  new_schema = []

  if isinstance(schema, StringType):
      return schema

  for field in schema:
      full_field_name = field.name

      if parent:
          full_field_name = parent + "." + full_field_name

      if full_field_name not in fields_to_change:
          if isinstance(field.dataType, StructType):
              inner_schema = change_nested_field_type(field.dataType, fields_to_change, full_field_name)
              new_schema.append(StructField(field.name, inner_schema))
          elif isinstance(field.dataType, ArrayType):
              inner_schema = change_nested_field_type(field.dataType.elementType, fields_to_change, full_field_name)
              new_schema.append(StructField(field.name, ArrayType(inner_schema)))
          else:
              new_schema.append(StructField(field.name, field.dataType))
      else:
          # Here we change the field type to String
          new_schema.append(StructField(field.name, StringType()))

  return StructType(new_schema)

and here's how I call the function

new_schema = ArrayType(change_nested_field_type(df.schema["groups"].dataType.elementType, ["my_field"]))
df = df.withColumn("json", to_json("groups")).drop("groups")
df = df.withColumn("groups", from_json("json", new_schema)).drop("json")
Alex Fragotsis
  • 1,236
  • 17
  • 34