The rapid increase in the amount of data we produce in daily life made us welcome big data technology in our lives really quickly. Storing and running down this data is fine but how to test in an easy manner is a question for many. Having a proper testing suite is one of the fundamental building blocks that differentiate hacking from software engineering. When it comes to delivering results, then the first name clicks of PySPark, high-quality software with rapid delivery to production.
Unlike traditional solutions, PySpark is a common technology that can fulfil our needs. Apache Spark is based on a framework that can process data very quickly and distributedly.
Need for Testing
The application has to be tested thoroughly end-to-end along with migration from the existing system to the new system successfully. It includes testing with old data, new data or combination of both, old features (unchanged features), and the new features. But, the possibility of any defects due to migration is very high. Many of them would be related to data schema and hence these defects need to be identified & fixed during testing. We run the testing to ensure whether the System response time of the new/upgraded application is the same or less than what it takes to the old application. Also, to secure if the connection between servers, hardware, software, etc., are all intact and do not break while testing. Data flow between different components should not break under any condition.
Automation testing and its Need
Since Big data is a collection of large datasets that cannot be processed using traditional computing techniques there is a need for automated test scripts. An Automation Testing is done by using an automation tool to execute your test case suite. It includes repetitive tests that run for multiple builds. Manual Testing takes a lot of effort and time where Automation Testing is done with ease without adding any human errors.
Data Migration from Oracle DB to Hadoop (BigData) in Pyspark
The preferred technique to process the data we store in our RDBMS databases with Apache Spark is to migrate the data to Hadoop (HDFS) in the first place then, distributedly, read the data we have stored in Hadoop (HDFS), and process it with Spark.
This program would run in a docker container where it can directly connect to the Hadoop environment and an Oracle jar that is provided to connect to Oracle DB. We will create tables in the Oracle database that we will read from Oracle and insert sample data in them.
IMPORT THE REQUIRED PACKAGES IN SPARK SHELL
Import the packages in the Spark Shell by using the codes given below,
import pyspark.sql.functions as f from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import * from functools import reduce import re import time
CREATE SPARK SESSION
A Spark Session can be created using a builder pattern. The spark session builder will try to get a spark session if there is one already created or create a new one and assign the newly created SparkSession as the global default.
def sparkSession(variable_0,variable_1,variable_2,variable_3): spark = SparkSession \ .builder \ .appName(variable_0) \ .config("spark.driver.extraClassPath",variable_1)\ .config("spark.executor.extraClassPath",variable_2)\ .config("spark.jars",variable_3)\ .getOrCreate() return spark
DEFINING TEST SUMMARY TABLE
The Test Summary Table can be defined by creating a derived or non-derived Test Cases based on the values in the platform Cases. By assigning values to the new Test Case, you add a Test name to the DataFrame.
Here are the tests that this script holds:
>Table Name
>Column Name
>Column Count
>Record Count
>Data Types
>Test duplicates based on Primary keys
>Nullability
Defining Test Summary Table
def testCasesSummary(spark): t1=spark.createDataFrame([('Table name should match with OracleDB','')]).toDF("Parameter","Status") t2=spark.createDataFrame([('Column Count should match with OracleDB','')]).toDF("Parameter","Status") t3=spark.createDataFrame([('Record Count should match with OracleDB','')]).toDF("Parameter","Status") t4=spark.createDataFrame([('Column Names should match with OracleDB','')]).toDF("Parameter","Status") t5=spark.createDataFrame([('Datatypes should match with OracleDB','')]).toDF("Parameter","Status") t6=spark.createDataFrame([('Nullability should match with OracleDB','')]).toDF("Parameter","Status") t7=spark.createDataFrame([('Duplicates should not be present based on primary key','')]).toDF("Parameter","Status") return t1,t2,t3,t4,t5,t6,t7
Enter the list of tables that we need to test
tablesList = ['table_1','table_2','table_3.........']
Create Spark Session with defined jars/Drivers
You could add the path to jar file using Spark configuration at Runtime. Refer the document for more information. Extract the downloaded jar file. Add a variable named and set its value as given below
variable_0 = "test" #Path where OracleDB jar in Hadoop Location variable_1 = "folder_1/ojdbc8.12.jar" variable_2 = "folder_1/ojdbc8.12.jar" variable_3 = "folder_1/ojdbc8.12.jar" spark = sparkSession(variable_0, variable_1, variable_2, variable_3) for tableName in tablesList: start_time = getTime()
Table Name Validation- The given below test case code appends a Table Name Validation to a DataFrame.
def testTableName(t1, source_tablename, target_tablename): if(source_tablename == "Not Exist")|(target_tablename == "Not Exist"): t1=spark.createDataFrame([('Table name should match with OracleDB','Failed')]) else: t1=spark.createDataFrame([('Table name should match with OracleDB','Passed')]) return t1
Column Count Validation- The given below test case code appends a Column Count Validation to a DataFrame.
def testColumnCount(t2, table, targetdf): sourceTable_Column_Count = table.select("COLUMN_NAME").count() targetTable_Column_Count = len(targetdf.columns) if sourceTable_Column_Count != targetTable_Column_Count: t2=spark.createDataFrame([('Column Count should match with OracleDB','Failed')]) else: t2=spark.createDataFrame([('Column Count should match with OracleDB','Passed')] return t2,sourceTable_Column_Count,targetTable_Column_Count
Record validation- The given below test case code appends a Record validation to a DataFrame.
def testRecordCount(t3, raw_df, targetdf): sourceTable_Rec_Count = raw_df.count() targetTable_Rec_Count = targetdf.count() if sourceTable_Rec_Count != targetTable_Rec_Count: t3=spark.createDataFrame([('Record Count should match with OracleDB','Failed')]) else: t3=spark.createDataFrame([('Record Count should match with OracleDB','Passed')]) return t3,sourceTable_Rec_Count,targetTable_Rec_Count
Column Name Validation– The given below test case code appends a Column Name Validation to a DataFrame.
def testColumnName(t4, table, targetdf): for col_1 in table.columns: table = table.withColumn(col_1, f.lower(f.col(col_1))) table.select("COLUMN_NAME").show(table.count()) targetdf.printSchema() sourcedf_Columns = table.select("COLUMN_NAME").rdd.flatMap(lambda x: x).collect() targetdf_Columns = targetdf.columns Status = "" for i in sourcedf_Columns: if i in targetdf_Columns: Status = "Passed" else: Status = "Failed" if Status == "Failed": t4=spark.createDataFrame([('Column Names should match with OracleDB','Failed')]) else: t4=spark.createDataFrame([('Column Names should match with OracleDB','Passed')]) return t4,sourcedf_Columns
Datatype Validation- The given below test case code appends a Datatype Validation to a DataFrame.
def testDataType(t5, table, targetdf): target_data_Types_List = targetdf.dtypes target_dtype_schema = StructType([ StructField("target_ColumnName", StringType(), True), StructField("target_DataType", StringType(), True)]) target_dtype_df_schema = spark.createDataFrame(target_data_Types_List, target_dtype_schema) target_dtype_df_schema.show(target_dtype_df_schema.count()) target_datatypes_df = target_dtype_df_schema \ .withColumn("target_ColumnName", f.trim(f.col("target_ColumnName"))) \ .withColumn("target_DataType", f.trim(f.col("target_DataType"))) for col_1 in table.columns: table = table.withColumn(col_1, f.lower(f.col(col_1))) source_datatypes_df = table \ .withColumn("source_ColumnName", f.trim(f.col("COLUMN_NAME"))) \ .withColumn("source_DataType", f.trim(f.col("DATA_TYPE"))) source_datatypes_df.select('source_ColumnName').show(source_datatypes_df.count()) Compare_datatype_df = source_datatypes_df \ .join(target_datatypes_df, target_datatypes_df["target_ColumnName"] == source_datatypes_df["source_ColumnName"],"inner") Compare_datatype_df.show(Compare_datatype_df.count()) Compare_datatype_df = Compare_datatype_df \ .withColumn("formattedSourceDBType", f.when( (f.col("source_DataType")=='number'), "int") \ .otherwise( f.when( (f.col("source_DataType")=='double'), "double") \ .otherwise( f.when((f.col("source_DataType") =="varchar2") , "string") \ .otherwise( f.when( (f.col("source_DataType")=='char'), "string") \ .otherwise( f.when(f.col("source_DataType") == "date", "timestamp") ) )))) Compare_datatype_df.select(['target_ColumnName','target_DataType','source_ColumnName','source_DataType','formattedSourceDBType']).show(Compare_datatype_df.count()) Compare_datatype_df_filtered = Compare_datatype_df.filter( (f.col("target_DataType") != f.col("formattedSourceDBType")) & (f.col("target_ColumnName")==f.col("source_ColumnName")) ) Compare_datatype_df_filtered.show(Compare_datatype_df_filtered.count()) Compare_datatype_df_numbered = Compare_datatype_df.filter( ((f.col("formattedSourceDBType") == "int") & (f.col("target_DataType") == "double")) | ((f.col("formattedSourceDBType") == "double") & (f.col("target_DataType") == "int")) ) Compare_datatype_df_filtered.show(Compare_datatype_df_filtered.count()) Compare_datatype_df_filtered=Compare_datatype_df_filtered.subtract(Compare_datatype_df_numbered) Compare_datatype_df_filtered.show(Compare_datatype_df_filtered.count()) Compare_datatype_df_filtered_count = Compare_datatype_df_filtered.count() if Compare_datatype_df_filtered_count > 0: t5=spark.createDataFrame([('Datatypes should match with OracleDB','Failed')]) else: t5=spark.createDataFrame([('Datatypes should match with OracleDB','Passed')]) return t5
Nullability Validation- The given below test case code appends a Nullability Validation to a DataFrame.
def testNullability(t6, targetdf, table): print("Entered testing nullability") sourceNulldf=table.where((col("NULLABLE") == 'N')) sourceNulldf.show(sourceNulldf.count()) for col_1 in sourceNulldf.columns: sourceNulldf = sourceNulldf.withColumn(col_1, f.lower(f.col(col_1))) sourceNullColumnsList = sourceNulldf.select("COLUMN_NAME").rdd.flatMap(lambda x: x).collect() targetNullColumnsList = sourceNullColumnsList targetNulldf=targetdf.select([column for column in targetdf.columns if column in targetNullColumnsList]) try: targetNull=targetNulldf.where(reduce(lambda x, y: x | y, (f.col(x).isNull() for x in targetNullColumnsList))) except Exception as e: targetNull=[] if targetNull==[] or targetNull.count()==0: t6=spark.createDataFrame([('Nullability should match with OracleDB','Passed')]) else: t6=spark.createDataFrame([('Nullability should match with OracleDB','Failed')]) return t6
#Enter list of tables that we need to test: tablesList = ['table_1','table_2','table_3........, table_100'] #Create Spark Session with defined jars/Drivers variable_0 = "test_Hadoop" #Path where OracleDB jar in Hadoop Location variable_1 = "folder_1/ojdbc8.12.jar" variable_2 = "folder_1/ojdbc8.12.jar" variable_3 = "folder_1/ojdbc8.12.jar" spark = sparkSession(variable_0, variable_1, variable_2, variable_3)
For loop:
for tableName in tablesList:
start_time = getTime() tableName = str.lower(tableName) try:
Connect to Hadoop
try: #Target path in Hadoop targetPath = "/folder_1/folder_2/"+str.lower(tableName)+"/" targetdf = spark.read.format("orc").option("header","true").option("inferSchema", "true").load(targetPath) target_tablename = 'Exist' except Exception as e: target_tablename = "Not Exist"
Connect to Oracle DB
#4 # Loading OracleDB(source) hostname = "hostname" dbname = "database_name" jdbcPort = 1234 username = "***" password = "***"
#1 Query to fetch Primary keys within an Oracle table
try: jdbc_url = "jdbc:oracle:thin:{0}/{1}@{2}:{3}/{4}".format(username,password,hostname,jdbcPort,dbname) query_fetch_pk = "(SELECT COLUMN_NAME FROM all_cons_columns WHERE constraint_name = (SELECT constraint_name FROM all_constraints WHERE UPPER(table_name) = UPPER(""'"+str.upper(tableName)+"'"") AND CONSTRAINT_TYPE = 'P'))" table_pk = spark.read \ .format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable", query_fetch_pk) \ .option("user", username) \ .option("password", password) \ .option("driver", "oracle.jdbc.driver.OracleDriver") \ .load() source_tablename = "Exists" except Exception as e: source_tablename = "Not Exist"
#2 Query to get column names, dataTypes, and Nullability from OracleDB
fetch_schema = "(select * FROM user_tab_columns where TABLE_NAME =""'"+str.upper(tableName)+"'"")" table = spark.read \ .format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable", fetch_schema) \ .option("user", username) \ .option("password", password) \ .option("driver", "oracle.jdbc.driver.OracleDriver") \ .load() col_list = ['TABLE_NAME','COLUMN_ID','COLUMN_NAME','DATA_TYPE','DATA_LENGTH','NULLABLE','DATA_DEFAULT'] table=table.select(col_list) table = table.withColumn("newcol",when(table["NULLABLE"] == 'N', False).otherwise(True)) collist = table.select("COLUMN_NAME").rdd.flatMap(lambda x: x).collect()
#3 Query to get record count from OracleDB
query_record_count = "(select count(*) FROM ""'"+str.upper(tableName)+"'"")" table_rec = spark.read \ .format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable", query_record_count) \ .option("user", username) \ .option("password", password) \ .option("driver", "oracle.jdbc.driver.OracleDriver") \ .load()
COMPARE ORACLE DB VS HADOOP
#Executing functions defined t1,t2,t3,t4,t5,t6,t7 = testCasesSummary(spark) #Test Cases t1 = testTableName(t1, source_tablename, target_tablename) t2,sourceTable_Column_Count,targetTable_Column_Count = testColumnCount(t2,table,targetdf) t3,sourceTable_Rec_Count,targetTable_Rec_Count = testRecordCount(t3,table_rec,targetdf) t4,sourcedf_Columns = testColumnName(t4,table,targetdf) t5 = testDataType(t5,table,targetdf) t6,targetdf_dups= testduplicateRows(t7, table_pk, targetdf, targetTable_Rec_Count) t7 = testNullability(t6,targetdf,table)
Test cases summary table
testSchema = StructType([ StructField("Parameter", StringType(), True)\ ,StructField("Status", StringType(), True) ]) df=spark.createDataFrame([], testSchema) dfs = [t1,t2,t3,t4,t5,t6,t7] df = reduce(DataFrame.unionAll, dfs) print("--- {11b4d86a5810527c77a0cf7d4ce0e3afd0fbcf990461d3f22f24a3792c474f0e}s seconds ---" {11b4d86a5810527c77a0cf7d4ce0e3afd0fbcf990461d3f22f24a3792c474f0e} (time.time() - begin_time)) except Exception as e: print(e)
From the testing point of view, it all means that the application has to be tested thoroughly end-to-end along with migration from the existing system to the new system successfully.
About the Author
Sebastian Srikanth Kumar is an Associate Business Analyst at Factspan who has a keen interest in External Automation projects like WhatsApp automation and more. His fondness for games is inevitable. He loves to keep him up-to-date to Tech news daily.