Big data unit testing in pyspark

Big-data unit-testing-in-pyspark

The rapid increase in the amount of data we produce in daily life made us welcome big data technology in our lives really quickly. Storing and running down this data is fine but how to test in an easy manner is a question for many. Having a proper testing suite is one of the fundamental building blocks that differentiate hacking from software engineering. When it comes to delivering results, then the first name clicks of PySPark, high-quality software with rapid delivery to production.

Unlike traditional solutions, PySpark is a common technology that can fulfil our needs. Apache Spark is based on a framework that can process data very quickly and distributedly.

Need for Testing

The application has to be tested thoroughly end-to-end along with migration from the existing system to the new system successfully.  It includes testing with old data, new data or combination of both, old features (unchanged features), and the new features. But, the possibility of any defects due to migration is very high. Many of them would be related to data schema and hence these defects need to be identified & fixed during testing. We run the testing to ensure whether the System response time of the new/upgraded application is the same or less than what it takes to the old application. Also, to secure if the connection between servers, hardware, software, etc., are all intact and do not break while testing. Data flow between different components should not break under any condition.

Automation testing and its Need

Since Big data is a collection of large datasets that cannot be processed using traditional computing techniques there is a need for automated test scripts. An Automation Testing is done by using an automation tool to execute your test case suite. It includes repetitive tests that run for multiple builds. Manual Testing takes a lot of effort and time where Automation Testing is done with ease without adding any human errors.

Data Migration from Oracle DB to Hadoop (BigData) in Pyspark

The preferred technique to process the data we store in our RDBMS databases with Apache Spark is to migrate the data to Hadoop (HDFS) in the first place then, distributedly, read the data we have stored in Hadoop (HDFS), and process it with Spark.

This program would run in a docker container where it can directly connect to the Hadoop environment and an Oracle jar that is provided to connect to Oracle DB. We will create tables in the Oracle database that we will read from Oracle and insert sample data in them.

IMPORT THE REQUIRED PACKAGES IN SPARK SHELL

Import the packages in the Spark Shell by using the codes given below,

import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *
from functools import reduce
import re
import time

CREATE SPARK SESSION

A Spark Session can be created using a builder pattern. The spark session builder will try to get a spark session if there is one already created or create a new one and assign the newly created SparkSession as the global default.

def sparkSession(variable_0,variable_1,variable_2,variable_3):
     spark = SparkSession \
                 .builder \
                 .appName(variable_0) \
                 .config("spark.driver.extraClassPath",variable_1)\
                 .config("spark.executor.extraClassPath",variable_2)\
                 .config("spark.jars",variable_3)\
                 .getOrCreate()
     return spark

DEFINING TEST SUMMARY TABLE

The Test Summary Table can be defined by creating a derived or non-derived Test Cases based on the values in the platform Cases. By assigning values to the new Test Case, you add a Test name to the DataFrame.

Here are the tests that this script holds:

>Table Name

>Column Name

>Column Count

>Record Count

>Data Types

>Test duplicates based on Primary keys

>Nullability

Defining Test Summary Table

def testCasesSummary(spark):
     t1=spark.createDataFrame([('Table name should match with OracleDB','')]).toDF("Parameter","Status")
     t2=spark.createDataFrame([('Column Count should match with OracleDB','')]).toDF("Parameter","Status")
     t3=spark.createDataFrame([('Record Count should match with OracleDB','')]).toDF("Parameter","Status")
     t4=spark.createDataFrame([('Column Names should match with OracleDB','')]).toDF("Parameter","Status")
     t5=spark.createDataFrame([('Datatypes should match with OracleDB','')]).toDF("Parameter","Status")
     t6=spark.createDataFrame([('Nullability should match with OracleDB','')]).toDF("Parameter","Status")
     t7=spark.createDataFrame([('Duplicates should not be present based on primary key','')]).toDF("Parameter","Status")
     return t1,t2,t3,t4,t5,t6,t7

 

Enter the list of tables that we need to test

tablesList = ['table_1','table_2','table_3.........']

Create Spark Session with defined jars/Drivers

You could add the path to jar file using Spark configuration at Runtime. Refer the document for more information. Extract the downloaded jar file. Add a variable named and set its value as given below

variable_0 = "test"

#Path where OracleDB jar in Hadoop Location
variable_1 = "folder_1/ojdbc8.12.jar"
variable_2 = "folder_1/ojdbc8.12.jar"
variable_3 = "folder_1/ojdbc8.12.jar"

spark = sparkSession(variable_0, variable_1, variable_2, variable_3)
for tableName in tablesList:
     start_time = getTime()

Table Name Validation- The given below test case code appends a Table Name Validation to a DataFrame.

def testTableName(t1, source_tablename, target_tablename):
     if(source_tablename == "Not Exist")|(target_tablename == "Not Exist"):
          t1=spark.createDataFrame([('Table name should match with OracleDB','Failed')])
     else:     
          t1=spark.createDataFrame([('Table name should match with OracleDB','Passed')])
     return t1

Column Count Validation- The given below test case code appends a Column Count Validation to a DataFrame.

def testColumnCount(t2, table, targetdf):
     sourceTable_Column_Count = table.select("COLUMN_NAME").count()
     targetTable_Column_Count = len(targetdf.columns)
     if sourceTable_Column_Count != targetTable_Column_Count:
         t2=spark.createDataFrame([('Column Count should match with OracleDB','Failed')])
     else:
         t2=spark.createDataFrame([('Column Count should match with OracleDB','Passed')]
     return t2,sourceTable_Column_Count,targetTable_Column_Count

Record validation- The given below test case code appends a Record validation to a DataFrame.

def testRecordCount(t3, raw_df, targetdf):
     sourceTable_Rec_Count = raw_df.count()
     targetTable_Rec_Count = targetdf.count()
     if sourceTable_Rec_Count != targetTable_Rec_Count:
         t3=spark.createDataFrame([('Record Count should match with OracleDB','Failed')])
     else:
         t3=spark.createDataFrame([('Record Count should match with OracleDB','Passed')])
     return t3,sourceTable_Rec_Count,targetTable_Rec_Count

Column Name Validation– The given below test case code appends a Column Name Validation to a DataFrame.

def testColumnName(t4, table, targetdf):
     for col_1 in table.columns:
         table = table.withColumn(col_1, f.lower(f.col(col_1)))
     table.select("COLUMN_NAME").show(table.count())
     targetdf.printSchema()
     sourcedf_Columns = table.select("COLUMN_NAME").rdd.flatMap(lambda x: x).collect()
     targetdf_Columns = targetdf.columns
     Status = ""
     for i in sourcedf_Columns:
         if i in targetdf_Columns:
             Status = "Passed"
         else:
             Status = "Failed"
     if Status == "Failed":
        t4=spark.createDataFrame([('Column Names should match with OracleDB','Failed')])
     else:
        t4=spark.createDataFrame([('Column Names should match with OracleDB','Passed')])
     return t4,sourcedf_Columns

Datatype Validation- The given below test case code appends a Datatype Validation to a DataFrame.

def testDataType(t5, table, targetdf):
     target_data_Types_List = targetdf.dtypes
     target_dtype_schema = StructType([
           StructField("target_ColumnName", StringType(), True),
           StructField("target_DataType", StringType(), True)])
     target_dtype_df_schema = spark.createDataFrame(target_data_Types_List, target_dtype_schema)
     
     target_dtype_df_schema.show(target_dtype_df_schema.count())
    
     target_datatypes_df = target_dtype_df_schema \
            .withColumn("target_ColumnName", f.trim(f.col("target_ColumnName"))) \
            .withColumn("target_DataType", f.trim(f.col("target_DataType")))

     for col_1 in table.columns:
                  table = table.withColumn(col_1, f.lower(f.col(col_1)))
     source_datatypes_df = table \
           .withColumn("source_ColumnName", f.trim(f.col("COLUMN_NAME"))) \
           .withColumn("source_DataType", f.trim(f.col("DATA_TYPE")))
     source_datatypes_df.select('source_ColumnName').show(source_datatypes_df.count())
     Compare_datatype_df = source_datatypes_df \
           .join(target_datatypes_df, 
                    target_datatypes_df["target_ColumnName"] == source_datatypes_df["source_ColumnName"],"inner")
     Compare_datatype_df.show(Compare_datatype_df.count())
     Compare_datatype_df = Compare_datatype_df \
                                            .withColumn("formattedSourceDBType", 
                                                f.when( (f.col("source_DataType")=='number'), "int") \
                                                   .otherwise(
                                                       f.when( (f.col("source_DataType")=='double'), "double") \
                                                          .otherwise(
                                                              f.when((f.col("source_DataType") =="varchar2") , "string") \
                                                                 .otherwise(
                                                                     f.when( (f.col("source_DataType")=='char'), "string") \
                                                                        .otherwise(
                                                                            f.when(f.col("source_DataType") == "date", "timestamp") ) ))))

     Compare_datatype_df.select(['target_ColumnName','target_DataType','source_ColumnName','source_DataType','formattedSourceDBType']).show(Compare_datatype_df.count())

     Compare_datatype_df_filtered = Compare_datatype_df.filter( (f.col("target_DataType") != f.col("formattedSourceDBType")) & (f.col("target_ColumnName")==f.col("source_ColumnName"))  )
     Compare_datatype_df_filtered.show(Compare_datatype_df_filtered.count())

     Compare_datatype_df_numbered = Compare_datatype_df.filter( ((f.col("formattedSourceDBType") == "int") & (f.col("target_DataType") == "double")) | ((f.col("formattedSourceDBType") == "double") & (f.col("target_DataType") == "int"))  )
     Compare_datatype_df_filtered.show(Compare_datatype_df_filtered.count())

     Compare_datatype_df_filtered=Compare_datatype_df_filtered.subtract(Compare_datatype_df_numbered)
     Compare_datatype_df_filtered.show(Compare_datatype_df_filtered.count())

     Compare_datatype_df_filtered_count = Compare_datatype_df_filtered.count()

     if Compare_datatype_df_filtered_count > 0:
           t5=spark.createDataFrame([('Datatypes should match with OracleDB','Failed')])
     else:
           t5=spark.createDataFrame([('Datatypes should match with OracleDB','Passed')])
     return t5

Nullability Validation- The given below test case code appends a Nullability Validation to a DataFrame.

def testNullability(t6, targetdf, table):
     print("Entered testing nullability")
     sourceNulldf=table.where((col("NULLABLE") == 'N'))
     sourceNulldf.show(sourceNulldf.count())
     for col_1 in sourceNulldf.columns:
              sourceNulldf = sourceNulldf.withColumn(col_1, f.lower(f.col(col_1)))
     sourceNullColumnsList = sourceNulldf.select("COLUMN_NAME").rdd.flatMap(lambda x: x).collect()
     targetNullColumnsList = sourceNullColumnsList
     targetNulldf=targetdf.select([column for column in targetdf.columns if column in targetNullColumnsList])

     try:
        targetNull=targetNulldf.where(reduce(lambda x, y: x | y, (f.col(x).isNull() for x in targetNullColumnsList)))
     except Exception as e:
        targetNull=[]
     if targetNull==[] or targetNull.count()==0:
        t6=spark.createDataFrame([('Nullability should match with OracleDB','Passed')])
     else:
        t6=spark.createDataFrame([('Nullability should match with OracleDB','Failed')])
     return t6

#Enter list of tables that we need to test:
tablesList = ['table_1','table_2','table_3........, table_100']

#Create Spark Session with defined jars/Drivers
variable_0 = "test_Hadoop"

#Path where OracleDB jar in Hadoop Location
variable_1 = "folder_1/ojdbc8.12.jar"
variable_2 = "folder_1/ojdbc8.12.jar"
variable_3 = "folder_1/ojdbc8.12.jar"
spark = sparkSession(variable_0, variable_1, variable_2, variable_3)

For loop:

for tableName in tablesList:
   start_time = getTime()
   tableName = str.lower(tableName)
   try:

Connect to Hadoop 

try:
    #Target path in Hadoop
    targetPath = "/folder_1/folder_2/"+str.lower(tableName)+"/"
    targetdf = spark.read.format("orc").option("header","true").option("inferSchema", "true").load(targetPath)
    target_tablename = 'Exist'
except Exception as e:
    target_tablename = "Not Exist"

Connect to Oracle DB

#4
# Loading OracleDB(source)
hostname = "hostname"
dbname = "database_name"
jdbcPort = 1234
username = "***"
password = "***"

#1 Query to fetch Primary keys within an Oracle table

try:
    jdbc_url = "jdbc:oracle:thin:{0}/{1}@{2}:{3}/{4}".format(username,password,hostname,jdbcPort,dbname)
    query_fetch_pk = "(SELECT COLUMN_NAME FROM all_cons_columns WHERE constraint_name = (SELECT constraint_name FROM all_constraints WHERE UPPER(table_name) = UPPER(""'"+str.upper(tableName)+"'"") AND CONSTRAINT_TYPE = 'P'))"
    table_pk = spark.read \
                    .format("jdbc") \
                    .option("url", jdbc_url) \
                    .option("dbtable", query_fetch_pk) \
                    .option("user", username) \
                    .option("password", password) \
                    .option("driver", "oracle.jdbc.driver.OracleDriver") \
                    .load()
    source_tablename = "Exists"
except Exception as e:
    source_tablename = "Not Exist"

#2 Query to get column names, dataTypes, and Nullability from OracleDB

fetch_schema = "(select * FROM user_tab_columns where TABLE_NAME =""'"+str.upper(tableName)+"'"")"
table = spark.read \
                    .format("jdbc") \
                    .option("url", jdbc_url) \
                    .option("dbtable", fetch_schema) \
                    .option("user", username) \
                    .option("password", password) \
                    .option("driver", "oracle.jdbc.driver.OracleDriver") \
                    .load()
col_list = ['TABLE_NAME','COLUMN_ID','COLUMN_NAME','DATA_TYPE','DATA_LENGTH','NULLABLE','DATA_DEFAULT']
table=table.select(col_list)
table = table.withColumn("newcol",when(table["NULLABLE"] == 'N', False).otherwise(True))
collist = table.select("COLUMN_NAME").rdd.flatMap(lambda x: x).collect()

#3 Query to get record count from OracleDB

query_record_count = "(select count(*) FROM ""'"+str.upper(tableName)+"'"")"
table_rec = spark.read \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", query_record_count) \
        .option("user", username) \
        .option("password", password) \
        .option("driver", "oracle.jdbc.driver.OracleDriver") \
        .load()

COMPARE ORACLE DB VS HADOOP

#Executing functions defined
    t1,t2,t3,t4,t5,t6,t7 = testCasesSummary(spark)

#Test Cases
    t1 = testTableName(t1, source_tablename, target_tablename)
    t2,sourceTable_Column_Count,targetTable_Column_Count = testColumnCount(t2,table,targetdf)
    t3,sourceTable_Rec_Count,targetTable_Rec_Count = testRecordCount(t3,table_rec,targetdf)
    t4,sourcedf_Columns = testColumnName(t4,table,targetdf)
    t5 = testDataType(t5,table,targetdf)
    t6,targetdf_dups= testduplicateRows(t7, table_pk, targetdf, targetTable_Rec_Count)
    t7 = testNullability(t6,targetdf,table)

Test cases summary table

    testSchema = StructType([ StructField("Parameter", StringType(), True)\
    ,StructField("Status", StringType(), True)
    ])
    df=spark.createDataFrame([], testSchema)
    dfs = [t1,t2,t3,t4,t5,t6,t7]
    df = reduce(DataFrame.unionAll, dfs)

    print("--- {11b4d86a5810527c77a0cf7d4ce0e3afd0fbcf990461d3f22f24a3792c474f0e}s seconds ---" {11b4d86a5810527c77a0cf7d4ce0e3afd0fbcf990461d3f22f24a3792c474f0e} (time.time() - begin_time))
except Exception as e:
    print(e)

From the testing point of view, it all means that the application has to be tested thoroughly end-to-end along with migration from the existing system to the new system successfully.

About the Author

Sebastian Srikanth Kumar is an Associate Business Analyst at Factspan who has a keen interest in External Automation projects like WhatsApp automation and more. His fondness for games is inevitable. He loves to keep him up-to-date to Tech news daily.

Most Popular

Let's Connect

Please enable JavaScript in your browser to complete this form.

Join Factspan Community

Subscribe to our newsletter

Related Articles

Add Your Heading Text Here

Blogs

Modernizing Medication Management: Data-driven Approach to Pyxis MedStation

Delve into the significance of Pyxis MedStation in healthcare, highlighting its
Read More ...
Blogs

Meta’s LLAMA 2 Vs Open AI’s ChatGPT

Explore the world of cutting-edge AI with a detailed analysis of
Read More ...
Blogs

Data Contract Implementation in a Kafka Project: Ensuring Data Consistency and Adaptability

Data contracts are essential for ensuring data consistency and adaptability in
Read More ...
Blogs

CDP: A band-aid solution?

Step into the world of Customer Data Platforms (CDPs) with our
Read More ...
Blogs

The Magical Transformation: How Nike Used Marketing Intelligence to Win the Game

Discover how Marketing Intelligence and Generative AI shape effective strategies. Learn
Read More ...
Blogs

Web 3.0: Transforming the Future of E-commerce

With Web 3.0, users will experience heightened control over their data,
Read More ...