Skip to main content

Transformation Temporal Workflow

The TransformationWorkflow in Data Nadhi processes logs through your user-defined pipeline nodes.
It applies transformations, evaluates conditions, and routes data to downstream destinations via DestinationWorkflow.

[Github Repository]: data-nadhi-temporal-worker


Overview

This workflow gets triggered as a child workflow of MainWorkflow on Temporal's task queue.
It processes logs according to your pipeline config, handling multiple nodes and branching logic while keeping metadata and traceability.

Transformation Worker


Workflow Steps

1. Receive Input

Input payload has:

  • Pipeline Configuration: All nodes and their connections
  • Log Data: The event or log to process
  • Start Node ID: Node where traversal starts
  • Context (ctx): Metadata for tracking (org, project, pipeline, message IDs)

2. Traverse Pipeline Nodes

  • Nodes are processed in queue order (FIFO)

  • Each node type determines the processing logic:

    • Transformation Node (type: transformation):
      • Executes transform activity
      • Returns next_nodes and transformed data
      • Errors are logged via log_failure activity but do not halt workflow
    • Condition/Branching Node (type: condition-branching):
      • Executes filters activity
      • Determines next nodes based on conditions
      • Failures are logged but workflow continues
    • End Node (type: end):
      • Executes child workflow DestinationWorkflow
      • Routes data to the appropriate target queue
  • Multiple outputs per node are stored in an array to maintain full node-level results.


3. Queue Management

  • Maintains a queue of nodes to process with their corresponding log data
  • Appends next nodes returned by activities into the queue
  • Ensures all paths are processed even if some nodes fail

4. Forward to Destination Workflow

  • When an end node is reached, the workflow triggers DestinationWorkflow as a child workflow
  • Assigned to the destination queue (derived from task queue name)
  • Uses workflow ID derived from parent workflow ID for traceability

Failure Handling with MinIO

All activity failures get logged to MinIO to keep traceability without stopping the workflow.
The logs include:

  • Exception type, message, and stack trace
  • Original input data and current workflow context
  • Node ID, org, project, pipeline, and message IDs

Node Outputs

  • Each node may produce multiple outputs during workflow execution
  • Outputs are stored in node_outputs dictionary keyed by node ID
  • Provides a complete record of all transformations and conditional branches

Detailed Logic of Activities

transform Activity

The transform activity currently supports JSON-specific operations: add_key and remove_key. This is designed to be extensible, allowing more transformation functions in the future.

Workflow Steps:

  1. Retrieve the transformation_fn key from the node configuration.
  2. Verify that the specified transformation function exists.
    • If the function does not exist, raise an error.
  3. Execute the transformation function using the provided log data as input.

Example Node Config:

{
"nodeConfig": {
"name": "Guest User Limitations",
"type": "transformation",
"transformation_fn": "Transformation.JSON.add_key",
"transformation_params": {
"key": "$.processing.level",
"value": "guest"
},
"input_type": "json",
"output_type": "json",
"next": [
"10101010-1010-1010-1010-101010101010"
]
},
"nodeId": "44444444-4444-4444-4444-444444444444",
"pipelineId": "e8f7e53a-2a81-491a-be83-afa5b7b1c218",
"projectId": "7219abae-3b7d-4f62-855c-6a32632df04a",
"organisationId": "6ecce9f6-58ce-4754-a278-a04558990987"
}

filters Activity

The filters activity is used for branching and conditional routing between nodes in the pipeline.

Workflow Steps:

  1. Retrieve the filters key from the node configuration.
  2. Iterate over each filter rule.
  3. For rules without a specific key, consider them as passed by default.
  4. For rules with a key, evaluate them using the rule engine to determine if they pass.
  5. If a filter passes, collect all nodes listed under its next key and add them to the next nodes list.
  6. Deduplicate the list of next nodes to avoid duplicate processing.

Example Node Config:

{
"nodeConfig": {
"name": "Validate Input Data",
"type": "condition-branching",
"filters": {
"valid-authenticated-user": {
"filter": {
"and": [
{
"check": {
"key": "$.user.id",
"value": null,
"operator": "ne"
}
},
{
"check": {
"key": "$.user.status",
"value": "active",
"operator": "et"
}
},
{
"check": {
"key": "$.user.email_verified",
"value": true,
"operator": "et"
}
}
]
},
"next": [
"33333333-3333-3333-3333-333333333333"
]
},
"guest-user": {
"filter": {
"and": [
{
"check": {
"key": "$.user.type",
"value": "guest",
"operator": "et"
}
},
{
"check": {
"key": "$.user.permissions.guest_allowed",
"value": true,
"operator": "et"
}
}
]
},
"next": [
"44444444-4444-4444-4444-444444444444"
]
},
"suspended-user": {
"filter": {
"or": [
{
"check": {
"key": "$.user.status",
"value": "suspended",
"operator": "et"
}
},
{
"check": {
"key": "$.user.violations",
"value": 3,
"operator": "gte"
}
}
]
},
"next": [
"55555555-5555-5555-5555-555555555555"
]
},
"all": {
"next": [
"66666666-6666-6666-6666-666666666666"
]
}
}
},
"nodeId": "11111111-1111-1111-1111-111111111111",
"pipelineId": "e8f7e53a-2a81-491a-be83-afa5b7b1c218",
"projectId": "7219abae-3b7d-4f62-855c-6a32632df04a",
"organisationId": "6ecce9f6-58ce-4754-a278-a04558990987"
}

TransformationWorkflow Summary

The TransformationWorkflow handles detailed node-level processing of logs:

  • Processes transformation nodes (add_key, remove_key) and other JSON operations.
  • Applies filters and branching for conditional workflow execution.
  • Triggers DestinationWorkflow for end nodes.
  • Keeps outputs per node for auditing and traceability.
  • Logs activity-level failures to MinIO without stopping other nodes.

This design gives you modular, fault-tolerant processing that allows high-throughput log transformations with complete observability.