Transformations
Transformations in Atlas provide a powerful mechanism to convert data from one entity format to another. They are defined by a set of rules that systematically process input entities to produce desired output entities.
Transformation Scripts
At the core of Atlas transformations are Python scripts. These scripts execute against input entities and generate transformed output entities according to your specifications.
Within the transformation script, you have access to the Pontus library, which simplifies working with input and output entities.
Here is an example of a transformation script that converts payroll data currency to USD. This script would be stored in a GitHub repository shared with Pontus, allowing it to be executed during transformation operations.
The Pontus Library is a Python package that automatically loads input entities into Spark DataFrames and handles writing output entities to the Atlas API. This eliminates boilerplate code, allowing you to focus exclusively on your transformation logic.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, current_timestamp
import os
import pontus
# Initialize Spark Session
spark = SparkSession.builder \
.appName("Payroll Processing") \
.getOrCreate()
# Define currency conversion rates to USD (example rates)
conversion_rates = {
"USD": 1.0,
"EUR": 1.1, # 1 EUR = 1.1 USD
"GBP": 1.3, # 1 GBP = 1.3 USD
"CAD": 0.75, # 1 CAD = 0.75 USD
# Add more currencies as needed
}
# Load the Monthly Payroll entity data into a Spark DataFrame
monthly_df = pontus.atlas.entities.get("Monthly Payroll")
# Convert income to USD based on currency
for currency, rate in conversion_rates.items():
monthly_df = monthly_df.withColumn(
"Income",
when(col("Currency") == currency, col("Income") * lit(rate))
.otherwise(col("Income"))
)
# Update all records to indicate the currency is now USD
monthly_df = monthly_df.withColumn("Currency", lit("USD"))
# Write the transformed data to the Payroll History entity
pontus.atlas.entities.set("Payroll History", monthly_df, mode="overwrite")
spark.stop()
Workflow Integration
Transformations are executed as steps within Atlas workflows. Simply include the transformation as a defined step in your workflow configuration.
{
"name": "transform",
"type": "atlas",
"atlas_data": {
"action": "transform",
"input_data": {
"language": "pyspark",
"code_path": "https://github.com/your-repo/transformations/payroll_transformation.py",
"input_entities": ["monthly_payroll"],
"output_entities": ["payroll_history"]
}
}
}