Today in this article, we will see how to use Python Databricks Dataframe Nested Arrays in Pyspark. We will see details on Handling nested Arrays in Pyspark.
Towards the end of this article, we will also cover, when working with PySpark DataFrame transformations and handling arrays, there are several best practices to keep in mind to ensure efficient and effective data processing.
I have below sample JSON which contains a mix of array fields and objects as below,
[
{
"name": "Alice",
"date_field": "2022-03-30",
"area": {
"city": {
"city_code": "asdas",
"date_field": "2022-03-30"
},
"projects": [
{
"area_code": "sdas",
"date_field": "2022-03-30"
}
]
}
}
]
PySpark DataFrame transformations
PySpark DataFrame transformations involve operations used to manipulate data within DataFrames.
There are various ways and common use cases where this transformations can be applied.
- Filtering Data: Use the
filter()
orwhere()
functions - Selecting Columns: Use the
select()
function to choose specific columns from the DataFrame. This is useful when you only need certain columns for further processing or analysis. - Grouping and Aggregating: Use functions like
groupBy()
andagg()
to group data based on one or more columns and perform aggregations such as sum, count, average, etc. - Joining DataFrames: Use the
join()
function to combine two DataFrames based on a common key. - Sorting Data: Use the
orderBy()
orsort()
functions to sort the DataFrame based on one or more columns. = - Adding or Removing Columns: Use functions like
withColumn()
anddrop()
to add new columns to the DataFrame or remove existing columns, respectively. - String Manipulation: Use functions like
substring()
,trim()
,lower()
,upper()
, etc., to perform string operations on DataFrame columns. - Date and Time Manipulation: Use functions like
to_date()
,year()
,month()
,dayofmonth()
, etc., from thepyspark.sql.functions
module to work with date and time columns.
If you have basic data source and need to transform few fields like performing the Date and time manipulation, one can try below steps to achieve the transformation.
Define StructType schema in PySpark
# Define the schema
schema = StructType([
StructField("name", StringType(), True),
StructField("date_field", StringType(), True),
StructField("area_code", StructType([
StructField("city", StructType([
StructField("city_code", StringType(), True),
StructField("date_field", StringType(), True)
]), True),
StructField("projects", ArrayType(StructType([
StructField("area_code", StringType(), True),
StructField("date_field", StringType(), True)
])), True)
]))
])
Modify date field datatype in DataFrame schema
Updated schema type as below for date field where , we will be converting string type timestamp type
StructField("date_field", TimestampType(), True)
# Define the schema
schema = StructType([
StructField("name", StringType(), True),
StructField("date_field", TimestampType(), True),
StructField("area", StructType([
StructField("city", StructType([
StructField("SpecCode", StringType(), True),
StructField("date_field", TimestampType(), True)
]), True),
StructField("projects", ArrayType(StructType([
StructField("code", StringType(), True),
StructField("date_field", TimestampType(), True)
])), True)
]))
])
Convert JSON list to JSO string with indentation
# Convert the JSON list to a JSON string with indentation
json_string = json.dumps(json_list, indent=2)
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DateType
from pyspark.sql.functions import col, explode, to_date
# Initialize SparkSession
spark = SparkSession.builder \
.appName("Transform JSON Data") \
.getOrCreate()
# Convert the JSON list to a JSON string with indentation
json_string = json.dumps(json_list, indent=2)
# Create DataFrame from JSON data with defined schema
df = spark.read.schema(schema).json(spark.sparkContext.parallezie(Json_string))
# Write DataFrame to destination
df.write.format("destination").mode("append").save()
# Stop SparkSession
spark.stop()
Above is a generic implementation and can be used to push the data to any destination as required including MongoDB, SQL etc.
Approach 2- Explode nested array in DataFrame
One can also use the data frame explode method to convert a string field to the date field as explained in the below example.
#Apply transformations to nested fields
df_transformed = df \
.withColumn("date_field", to_date(col("date_field"))) \
.withColumn("area.city.date_field", convert_to_date("area.city.date_field")) \
.withColumn("area.projects", explode(col("area.projects"))) \
.withColumn("area.projects.date_field", convert_to_date("area.projects.date_field"))
Do you have any comments or ideas or any better suggestions to share?
Please sound off your comments below.
Happy Coding !!
Please bookmark this page and share it with your friends. Please Subscribe to the blog to receive notifications on freshly published(2024) best practices and guidelines for software design and development.
thank you .. this fix was easier than using python explode and doing a transformation