Transformation Temporal Workflow
The TransformationWorkflow in Data Nadhi processes logs through your user-defined pipeline nodes.
It applies transformations, evaluates conditions, and routes data to downstream destinations via DestinationWorkflow.
[Github Repository]: data-nadhi-temporal-worker
Overview
This workflow gets triggered as a child workflow of MainWorkflow on Temporal's task queue.
It processes logs according to your pipeline config, handling multiple nodes and branching logic while keeping metadata and traceability.
Workflow Steps
1. Receive Input
Input payload has:
- Pipeline Configuration: All nodes and their connections
- Log Data: The event or log to process
- Start Node ID: Node where traversal starts
- Context (
ctx): Metadata for tracking (org, project, pipeline, message IDs)
2. Traverse Pipeline Nodes
-
Nodes are processed in queue order (FIFO)
-
Each node type determines the processing logic:
- Transformation Node (
type: transformation):- Executes
transformactivity - Returns
next_nodesand transformed data - Errors are logged via
log_failureactivity but do not halt workflow
- Executes
- Condition/Branching Node (
type: condition-branching):- Executes
filtersactivity - Determines next nodes based on conditions
- Failures are logged but workflow continues
- Executes
- End Node (
type: end):- Executes child workflow
DestinationWorkflow - Routes data to the appropriate target queue
- Executes child workflow
- Transformation Node (
-
Multiple outputs per node are stored in an array to maintain full node-level results.
3. Queue Management
- Maintains a queue of nodes to process with their corresponding log data
- Appends next nodes returned by activities into the queue
- Ensures all paths are processed even if some nodes fail
4. Forward to Destination Workflow
- When an end node is reached, the workflow triggers
DestinationWorkflowas a child workflow - Assigned to the destination queue (derived from task queue name)
- Uses workflow ID derived from parent workflow ID for traceability
Failure Handling with MinIO
All activity failures get logged to MinIO to keep traceability without stopping the workflow.
The logs include:
- Exception type, message, and stack trace
- Original input data and current workflow context
- Node ID, org, project, pipeline, and message IDs
Node Outputs
- Each node may produce multiple outputs during workflow execution
- Outputs are stored in node_outputs dictionary keyed by node ID
- Provides a complete record of all transformations and conditional branches
Detailed Logic of Activities
transform Activity
The transform activity currently supports JSON-specific operations: add_key and remove_key. This is designed to be extensible, allowing more transformation functions in the future.
Workflow Steps:
- Retrieve the
transformation_fnkey from the node configuration. - Verify that the specified transformation function exists.
- If the function does not exist, raise an error.
- Execute the transformation function using the provided log data as input.
Example Node Config:
{
"nodeConfig": {
"name": "Guest User Limitations",
"type": "transformation",
"transformation_fn": "Transformation.JSON.add_key",
"transformation_params": {
"key": "$.processing.level",
"value": "guest"
},
"input_type": "json",
"output_type": "json",
"next": [
"10101010-1010-1010-1010-101010101010"
]
},
"nodeId": "44444444-4444-4444-4444-444444444444",
"pipelineId": "e8f7e53a-2a81-491a-be83-afa5b7b1c218",
"projectId": "7219abae-3b7d-4f62-855c-6a32632df04a",
"organisationId": "6ecce9f6-58ce-4754-a278-a04558990987"
}
filters Activity
The filters activity is used for branching and conditional routing between nodes in the pipeline.
Workflow Steps:
- Retrieve the
filterskey from the node configuration. - Iterate over each filter rule.
- For rules without a specific key, consider them as passed by default.
- For rules with a key, evaluate them using the rule engine to determine if they pass.
- If a filter passes, collect all nodes listed under its
nextkey and add them to the next nodes list. - Deduplicate the list of next nodes to avoid duplicate processing.
Example Node Config:
{
"nodeConfig": {
"name": "Validate Input Data",
"type": "condition-branching",
"filters": {
"valid-authenticated-user": {
"filter": {
"and": [
{
"check": {
"key": "$.user.id",
"value": null,
"operator": "ne"
}
},
{
"check": {
"key": "$.user.status",
"value": "active",
"operator": "et"
}
},
{
"check": {
"key": "$.user.email_verified",
"value": true,
"operator": "et"
}
}
]
},
"next": [
"33333333-3333-3333-3333-333333333333"
]
},
"guest-user": {
"filter": {
"and": [
{
"check": {
"key": "$.user.type",
"value": "guest",
"operator": "et"
}
},
{
"check": {
"key": "$.user.permissions.guest_allowed",
"value": true,
"operator": "et"
}
}
]
},
"next": [
"44444444-4444-4444-4444-444444444444"
]
},
"suspended-user": {
"filter": {
"or": [
{
"check": {
"key": "$.user.status",
"value": "suspended",
"operator": "et"
}
},
{
"check": {
"key": "$.user.violations",
"value": 3,
"operator": "gte"
}
}
]
},
"next": [
"55555555-5555-5555-5555-555555555555"
]
},
"all": {
"next": [
"66666666-6666-6666-6666-666666666666"
]
}
}
},
"nodeId": "11111111-1111-1111-1111-111111111111",
"pipelineId": "e8f7e53a-2a81-491a-be83-afa5b7b1c218",
"projectId": "7219abae-3b7d-4f62-855c-6a32632df04a",
"organisationId": "6ecce9f6-58ce-4754-a278-a04558990987"
}
TransformationWorkflow Summary
The TransformationWorkflow handles detailed node-level processing of logs:
- Processes transformation nodes (
add_key,remove_key) and other JSON operations. - Applies filters and branching for conditional workflow execution.
- Triggers DestinationWorkflow for end nodes.
- Keeps outputs per node for auditing and traceability.
- Logs activity-level failures to MinIO without stopping other nodes.
This design gives you modular, fault-tolerant processing that allows high-throughput log transformations with complete observability.