The following article provides you tips for writing into Postgres Database using Spark.
Note: These tips are based on my experience of working with Spark 2.4.5 and 3.0.0.

- DStream vs Structured Streaming
There are two options available for us while choosing Spark Streaming, DStream(Batch processing) or Structured Streaming(Real-time processing or near real-time to be precise).
If we have millions of rows to read and write in a very limited time then it’s best to choose Structured Streaming (Ex: Read 10Million rows and Write those rows to DB in 10 to 15sec). Although the performance will be decided based on DB performance, resources available for Spark, and many more, when I tried the above example scenario with DStream and Structured Streaming, clearly Structured Streaming is performing way better than DStream. One of the reasons is data reading in a structured format (DataFrames) in Structured Streaming whereas it is in an unstructured format(RDD) in DStream. - Number of partitions
The number of partitions that the data is split into will also play an important role while writing to DB.
If we go with DStream then we have an inbuilt method to writing to Databases whereas we don’t have that option in Structured Streaming. We should use either foreach or foreachbatch. Read more about foreach and foreachbatch from https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
For both foreach and foreachbatch sink, the number of connections that will be opened to connect to DB will depend upon the number of partitions. There is a limit to the number of connections that can be made at a time, default is 100 for Postgres. So depending upon how many spark jobs and applications that are trying to access DB at a time, we have to choose the partitions. We can increase the number of connections as we like but there is a trade-off between database performance and managing connections.
Reference: https://help.compose.com/docs/postgresql-connection-limits - Data skewness in partitions
After selecting the number of partitions the next thing to be wary of is the data skewness in the partitions. Data skew is not an issue with Spark, rather it is a data problem. The cause of the data skew problem is the uneven distribution of the underlying data getting from the source. To overcome this we can use
`dataframe.repartition(10) //number of partitions we need
This will shuffle the data among the partitions and distribute them equally. But this will consume some seconds based on the data available.
If we have a source like Kafka then we have an option to distribute the data equally among the topic partitions. So while reading the Kafka topics we already have our data distributed equally and try to maintain the same partitions while writing to DB unless you have some restrictions mentioned in the above point. This will save us the seconds which we may have lost in repartitioning the data. - Choosing between Foreach and ForeachBatch
This really depends on the use case. Spark documentation has a detailed explanation: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch - Choosing the SQL statement to write to DB
Generally, when we try to write to DB we use an INSERT statement. As I mentioned earlier, if we choose DStream then we have an inbuilt method to writing to DB which will use INSERT statement. This works absolutely fine, there is no issue with this statement unless we have huge data to write. If we have a huge number of records to be written to be DB then there is another way to write to DB which is using COPY statement which writes data faster than INSERT statement.
Sample COPY statement: `COPY TABLE_NAME (col, col2) FROM / TO ‘filename’ DELIMITER ‘ ‘;
So the main question is how to use this COPY statement in our spark application.
Short answer: Using COPY Manager Postgres package
For this purpose, I have developed a sample code.
Link to GitHub: https://github.com/NOWSHAD76/Spark/tree/main/Spark_copy_cmd
In this code, I have used foreach sink. We can also use foreachbatch sink but there will be slight changes. The logic that can be followed in both cases is similar.
Note: Go through the official spark documentation of foreach sink first to understand the below algorithm.
Algorithm for foreach sink:
1) Read the data frame
2) Do the necessary transformations
3) For the final data frame which needs to be written to DB using foreach
a) open method — Open the connection to DB and initialize the necessary variable
b) process method — If required we can make any transformation on row-level and write it to the string builder
c) close method — Write the string builder to DB and close the connection
Algorithm for foreachbatch sink:
1) Read the dataframe
2) Do the necessary transformations
3) For the final data frame which needs to be written to DB using foreachBatch sink
a) Inside for each batch split it into each partition
b) Add the rows in each partition to a string builder
c) Write the string builder to DB
FAQ:
1) Why store in String builder?
Ans) Basically while using COPY statement using COPY Manager we need a stream of input which can be achieved by String builder2) In foreach sink why are writing to DB in the closing method rather than the process method?
Ans) Foreach sink is generally used if we want to transform on a row level. This can be achieved by transforming our rows in process method. If we write the transformed data into DB in process method then we will be writing one record at a time to DB which doesn’t actually serve the purpose of using COPY statement and writing a single row at a time will make our processing very slow. That’s the reason we are storing all the rows in String builder and writing all the rows stored while closing the connection.3)Can we control the number of records that can be written to DB at a time?
Ans) Yes, it is possible to limit the number of records that can be written at a time. For this, we need to change the logic a little. This is implemented in the sample code I have provided in the git.