Python – PySpark Connect to MongoDB
Today in this article, we will see how to use PySpark Connect to MongoDB using Python code examples.
We will make use of MongoDB Spark Connector which helps connect to MongoDB and allows us to read and write data between PySpark and MongoDB.
Here’s a basic example demonstrating how to read data from MongoDB, perform a transformation, and then write the results back to MongoDB.
We have below MongoDB collection,
Install the MongoDB Spark Connector
Please install MongoDB Spark Connector using the below command.
Command
pip install pyspark
For more details, please visit the official MongoDB website:
https://www.mongodb.com/products/spark-connector
Create a SparkSession
Object
Once you have PySpark running, you can use the below command to create a PySpark session object as below,
from pyspark.sql import SparkSession
my_spark = SparkSession
.builder
.appName("TheCodeBuzz")
.config("spark.mongodb.input.uri", "mongodb://host/TheCodeBuzz.Orders")
.config("spark.mongodb.output.uri", "mongodb://host/TheCodeBuzz.Orders")
.getOrCreate()
We create a Spark session with the MongoDB connector configurations.
We will use the above SparkSession
object to read, and write data to MongoDB, etc.
Read Data from MongoDB using PySpark
You must specify the following configuration settings to read from MongoDB:
dataFrame = spark.read
.format("mongodb")
.option("database", "TheCodeBuzz")
.option("collection", "Orders")
.load()
Above spark.read() function returns a DataFrameReader
object, which you can use to specify the format and other configuration settings for batch read operation.
Write data to MongoDB using PySpark
To write data from MongoDB, call the write function on your SparkSession object.
You must specify the following configuration settings to write the data to MongoDB,
data = [
{
"Order": "journal2",
"qty": 25,
"books": [
"white"
],
"domain": [
14
],
"Link": "https://www.thecodebuzz.com/order/23456"
},
{
"Order": "journal1",
"qty": 25,
"books": [
"black"
],
"domain": [
15
],
"Link": "https://www.thecodebuzz.com/order/2324"
}
]
dataFrame = spark.read.json(sc.parallelize([data]))
dataFrame.write.format("mongodb")
.mode("append")
.option("database", "TheCodeBuzz")
.option("collection", "Orders")
.option("upertDocument", "true")
.save()
The above example createDataFrame() function creates a DataFrame object from JSON input and saves it to MongoDB.
That’s all! Happy coding!
Does this help you fix your issue?
Do you have any better solutions or suggestions? Please sound off your comments below.
Please bookmark this page and share it with your friends. Please Subscribe to the blog to receive notifications on freshly published(2024) best practices and guidelines for software design and development.