from pyspark.sql import SparkSession
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
# Create Spark Session
spark = SparkSession.builder \
.appName("PyDeequ Data Quality Demo") \
.config("spark.jars.packages", "com.amazon.deequ:deequ:2.0.7-spark-3.3") \
.getOrCreate()
# Sample Orders Data
data = [
(1, 101, "2026-06-01", 1200.50, "DELIVERED"),
(2, 102, "2026-06-01", 800.00, "SHIPPED"),
(3, 103, "2026-06-02", -200.00, "DELIVERED"), # Invalid amount
(4, None, "2026-06-02", 500.00, "CANCELLED"), # Missing customer_id
(5, 105, None, 999.00, "SHIPPED"), # Missing order_date
(5, 106, "2026-06-03", 300.00, "RETURNED") # Duplicate order_id + invalid status
]
columns = ["order_id", "customer_id", "order_date", "order_amount", "order_status"]
orders_df = spark.createDataFrame(data, columns)
print("Input Orders Data:")
orders_df.show()
# Define PyDeequ Data Quality Checks
check = Check(spark, CheckLevel.Error, "Orders Data Quality Check") \
.isComplete("order_id") \
.isUnique("order_id") \
.isComplete("customer_id") \
.isComplete("order_date") \
.isNonNegative("order_amount") \
.isContainedIn("order_status", ["DELIVERED", "SHIPPED", "CANCELLED"])
# Run Verification
result = VerificationSuite(spark) \
.onData(orders_df) \
.addCheck(check) \
.run()
# Convert result into DataFrame
result_df = VerificationResult.checkResultsAsDataFrame(spark, result)
print("Data Quality Check Result:")
result_df.select(
"check",
"check_level",
"check_status",
"constraint",
"constraint_status"
).show(truncate=False)
"""
Expected Output:
Input Orders Data:
+--------+-----------+----------+------------+------------+
|order_id|customer_id|order_date|order_amount|order_status|
+--------+-----------+----------+------------+------------+
| 1| 101|2026-06-01| 1200.5| DELIVERED|
| 2| 102|2026-06-01| 800.0| SHIPPED|
| 3| 103|2026-06-02| -200.0| DELIVERED|
| 4| null|2026-06-02| 500.0| CANCELLED|
| 5| 105| null| 999.0| SHIPPED|
| 5| 106|2026-06-03| 300.0| RETURNED|
+--------+-----------+----------+------------+------------+
Data Quality Check Result:
+-------------------------+-----------+------------+--------------------------------------------------------------+-----------------+
|check |check_level|check_status|constraint |constraint_status|
+-------------------------+-----------+------------+--------------------------------------------------------------+-----------------+
|Orders Data Quality Check|Error |Error |CompletenessConstraint(Completeness(order_id,None,None)) |Success |
|Orders Data Quality Check|Error |Error |UniquenessConstraint(Uniqueness(List(order_id),None,None)) |Failure |
|Orders Data Quality Check|Error |Error |CompletenessConstraint(Completeness(customer_id,None,None)) |Failure |
|Orders Data Quality Check|Error |Error |CompletenessConstraint(Completeness(order_date,None,None)) |Failure |
|Orders Data Quality Check|Error |Error |ComplianceConstraint(Compliance(order_amount is non-negative))|Failure |
|Orders Data Quality Check|Error |Error |ComplianceConstraint(Compliance(order_status contained in...))|Failure |
+-------------------------+-----------+------------+--------------------------------------------------------------+-----------------+
Final Interpretation:
order_id completeness -> Passed
order_id uniqueness -> Failed because order_id = 5 is duplicate
customer_id completeness -> Failed because one record has null customer_id
order_date completeness -> Failed because one record has null order_date
order_amount non-negative -> Failed because one record has -200
order_status validation -> Failed because RETURNED is not allowed
Pipeline Decision:
Since check_status = Error, this dataset should not be loaded into Gold layer.
Bad records should be quarantined or fixed before publishing.
"""
Comments
Post a Comment