Using LangGraph Agents to Orchestrate Data Workflows

By Deiby Gómez

If you want your data workflows to make decisions on the fly, this post shows how to do it with LangGraph Agents. You’ll see how they compare to traditional tools and how to set them up in your projects. 

In data engineering and intelligent systems, complex pipelines have traditionally relied on tools like Apache Airflow, Dagster, or Prefect. These platforms are powerful but require predefined DAG structures and lack the dynamic, decision-based flow that modern AI-enabled pipelines need.

We’ll explore another alternative: using LangGraph Agents to orchestrate data workflows. Powered by LangChain, LangGraph enables the construction of stateful, reactive agents capable of making real-time decisions based on conditions, errors, or external feedback. We’ll demonstrate how to orchestrate a pipeline that includes data ingestion (e.g., Fivetran), transformation using dbt, retry logic for failed tasks, and alerting mechanisms, all through a flexible, agent-based architecture.

This approach blurs the line between orchestration and reasoning, allowing for intelligent pipeline control that adapts like a human operator. The image below shows the logic I want to orchestrate with LangGraph. This article provides the foundational structure of the solution. You can enrich the code by integrating your tools and requirements.

A screenshot of a computer  AI-generated content may be incorrect.

Start by importing the required Libraries:

import os
import time
import subprocess
from typing import TypedDict, Literal, List
from langgraph.graph import END, StateGraph
from langgraph.graph.message import MessageGraph
from langchain_core.runnables import Runnable, RunnableLambda
import requests
from requests.models import Response

At the heart of this implementation is the AgentState, a custom dictionary structure that stores the agent’s current status and retry information. This ensures state persistence and control flow based on context and conditions, which is critical for handling retries and complex transitions:

# Agent State definition
class AgentState(TypedDict):
    status: Literal["start", "dataingestion", "verify_count", "clear_ingestion", "dbt_bronze", "dbt_gold", "done", "failed"]
    dataingestion_retry: int
    dataingestion_retry_max: int

This node initiates a job to extract data. If it fails, it increments the retry counter and either retries or moves to the “failed” state.

# Execute Data Extraction Job 
def run_dataingestion(state: AgentState) -> AgentState:

    try:
        return {**state, "status": "verify_count"}
    except Exception as e:
        state["dataingestion_retry"] =state["dataingestion_retry"] + 1
        if state["dataingestion_retry"] < state["dataingestion_retry_max"]:
            return {**state, "status": "dataingestion" }
        else:
            return {**state, "status": "failed" }

It compares the row counts between the source and Snowflake to ensure the data was transferred correctly. If there’s a mismatch and retries remain, it re-attempts ingestion. If retries are exhausted, it triggers a fallback clean-up step.

# Verify rowcount between Oracle and Snowflake
def verify_data_counts(state: AgentState) -> AgentState:
    try:
        oracle_count = 10 #example
        sf_count = 10     #example

        #YOUR CODE HERE
        if oracle_count == sf_count:
            print("Rowcounts verification: Succeeded.")
            return {**state, "status": "dbt_bronze"}
        else:
            print("Rowcounts verification: Failed.")
            if state["dataingestion_retry"] >= state["dataingestion_retry_max"]:
                return {**state, "status": "failed"}
            else:
                return {**state, "status": "dataingestion", "dataingestion_retry": state["dataingestion_retry"] + 1}

    except Exception as e:
        print("Error comparando counts:", e)
        return {**state, "status": "failed"}

These nodes execute dbt transformation layers. A failure in either triggers a notification and halts the pipeline.

# Execute DBT Job
def run_dbt(state: AgentState) -> AgentState:

    try:
        if state["status"] =='dbt_bronze':
            #YOUR CODE HERE
            print ("DBT Bronze: Succeeded")
            #If succeeded 
            return {**state, "status": "dbt_gold"}
        else:
            #YOUR CODE HERE
            print ("DBT Gold: Succeeded")
            #If succeeded 
            return {**state, "status": "done"}
    except Exception as e:
        if state["status"] =='dbt_bronze':
            print ("DBT Bronze: Failed")
        else:
            print ("DBT Gold: Failed")

        return {**state, "status": "failed"}

This optional node is executed if validation fails after all retries. It allows the agent to clean the last ingestion state before restarting the pipeline.

# Clearing Data Extraction 
def clear_dataingestion(state: AgentState) -> AgentState:

    try:
        #YOUR CODE HERE
        print ("Clearing data ingestio: Succeeded")
        return {**state, "status": "dataingestion"}
    except Exception as e:
        print ("Clearing data ingestio: Failed")
        return {**state, "status": "failed"}

This function is called upon reaching any “failed” state. It’s a placeholder that could be extended to send real notifications through SMTP, Slack, etc.

# Email notifications 
def email_notification():
    try:
        #YOUR CODE HERE 
        print ("Sending email to notify: Succeeded")
    except Exception as e:
        print ("Sending email to notify: Failed")

The next_step(state) function controls transitions between nodes using the current status in the AgentState. It prints the state and decides the next node to execute. LangGraph uses this logic to route execution paths dynamically, a major improvement over rigid DAGs.

# Transitions
def next_step(state: AgentState) -> str:
    if  state["status"]=='failed':
        print("   Next step: failed")
        print("   Data Ingestion Retries: "+str(state["dataingestion_retry"] ))
        email_notification()
        return "failed"
    else:
        print("   Next step: "+str(state["status"]))
        print("   Data Ingestion Retries: "+str(state["dataingestion_retry"] ))
        return state["status"]

The orchestration is assembled by defining each step as a node, and wiring transitions between them using add_conditional_edges. The graph starts from the “dataingestion” node and proceeds conditionally depending on the execution results of each node.

# Building the orchestration
orchestration = StateGraph(AgentState)
orchestration.add_node("dataingestion",         RunnableLambda(run_dataingestion))
orchestration.add_node("verify_count",          RunnableLambda(verify_data_counts))
orchestration.add_node("clear_ingestion",       RunnableLambda(clear_dataingestion))
orchestration.add_node("dbt_bronze",            RunnableLambda(run_dbt))
orchestration.add_node("dbt_gold",              RunnableLambda(run_dbt))

# Graph Transitions
orchestration.set_entry_point("dataingestion")

orchestration.add_conditional_edges("dataingestion", next_step, {
    "dataingestion": "dataingestion",
    "verify_count": "verify_count",
    "failed": END
})

orchestration.add_conditional_edges("verify_count", next_step, {
    "dbt_bronze": "dbt_gold",
    "failed": "clear_ingestion"
})

orchestration.add_conditional_edges("dbt_bronze", next_step, {
    "dbt_gold": "dbt_gold",
    "failed": END
})

orchestration.add_conditional_edges("dbt_gold", next_step, {
    "dbt_gold": "dbt_gold",
    "done": END,
    "failed": END
})

graph = orchestration.compile()

The graph.invoke() call kicks off the workflow by supplying the initial state. The execution proceeds through the graph based on runtime logic until it reaches the END state, printing the final outcome.

# Execution of the Graph
final_state = graph.invoke({
    "status": "start",
    "dataingestion_retry": 0,
    "dataingestion_retry_max": 3
})

print("Final state:", final_state)

Next, we test the code.

Out when all the steps in the Graph are successful:

Data ingestion
   Next step: verify_count
   Data Ingestion Retries: 0
Rowcounts verification: Succeeded.
   Next step: dbt_bronze
   Data Ingestion Retries: 0
Executing DBT Bronze
   Next step: dbt_gold
   Data Ingestion Retries: 0
Executing DBT Gold
   Next step: done
   Data Ingestion Retries: 0
Final state: {'status': 'done', 'dataingestion_retry': 0}

Output when Data Ingestion Step fails 2 times, but it continues:

Data ingestion: Failed
   Next step: dataingestion
   Data Ingestion Retries: 1
Data ingestion: Failed
   Next step: dataingestion
   Data Ingestion Retries: 2
Data ingestion: Succeeded
   Next step: verify_count
   Data Ingestion Retries: 2
Rowcounts verification: Succeeded.
   Next step: dbt_bronze
   Data Ingestion Retries: 2
DBT Bronze: Succeeded
   Next step: dbt_gold
   Data Ingestion Retries: 2
DBT Gold: Succeeded
   Next step: done
   Data Ingestion Retries: 2
Final state: {'status': 'done', 'dataingestion_retry': 2}

Output when Data Ingestion Step fails more than 3 times:

Data ingestion: Failed
   Next step: dataingestion
   Data Ingestion Retries: 1
Data ingestion: Failed
   Next step: dataingestion
   Data Ingestion Retries: 2
Data ingestion: Failed
   Next step: failed
   Data Ingestion Retries: 3
Final state: {'status': 'failed', 'dataingestion_retry': 3, 'dataingestion_retry_max': 3}

LangGraph marks a shift in pipeline orchestration — from rigid DAGs to intelligent, condition‑aware agents. With LLMs and LangGraph’s stateful agent design, you’re no longer bound to static execution graphs. You can build adaptive, self‑recovering workflows that respond to success, failure, or uncertainty.

While Airflow and Dagster excel at traditional scheduling, LangGraph powers a new kind of orchestration — one that thinks, adapts, and decides. Whether automating data ingestion, chaining DBT models, or running real‑time feedback loops, LangGraph lets you orchestrate with intelligence.

If you’re ready to add reasoning to your pipelines, move beyond traditional DAGs toward AI‑driven orchestration.



Deiby Gómez was the first Oracle ACE Director of Guatemala (2015-2023), he has 35 Technology certifications among them the highest certifications for Oracle Database (Oracle Certified Master, OCM) and three Advanced Snowflake Certifications (Architect, Administrator and Data Engineer). He is an engineer with two master’s degrees and a future lawyer. Deiby has been a speaker in Technology Events in more than 12 countries including Oracle Open World in San Francisco. Most recently, Deiby was selected as Snowflake Squad Member by Snowflake in 2025.


Is your organization evolving to stay competitive?
The Architect Room helps you design a digital strategy that leverages current tech investments and prepares for what’s next.

We work across all major public cloud platforms to deliver scalable, practical solutions.🔗 Visit Red Pill Analytics
📱Connect with us on Twitter, Facebook, or  LinkedIn.
📚 For more data tips and insights, check out our blog on Medium

Scroll to Top