By Deiby Gómez
If you want your data workflows to make decisions on the fly, this post shows how to do it with LangGraph Agents. You’ll see how they compare to traditional tools and how to set them up in your projects.
In data engineering and intelligent systems, complex pipelines have traditionally relied on tools like Apache Airflow, Dagster, or Prefect. These platforms are powerful but require predefined DAG structures and lack the dynamic, decision-based flow that modern AI-enabled pipelines need.
We’ll explore another alternative: using LangGraph Agents to orchestrate data workflows. Powered by LangChain, LangGraph enables the construction of stateful, reactive agents capable of making real-time decisions based on conditions, errors, or external feedback. We’ll demonstrate how to orchestrate a pipeline that includes data ingestion (e.g., Fivetran), transformation using dbt, retry logic for failed tasks, and alerting mechanisms, all through a flexible, agent-based architecture.
This approach blurs the line between orchestration and reasoning, allowing for intelligent pipeline control that adapts like a human operator. The image below shows the logic I want to orchestrate with LangGraph. This article provides the foundational structure of the solution. You can enrich the code by integrating your tools and requirements.
Start by importing the required Libraries:
import os
import time
import subprocess
from typing import TypedDict, Literal, List
from langgraph.graph import END, StateGraph
from langgraph.graph.message import MessageGraph
from langchain_core.runnables import Runnable, RunnableLambda
import requests
from requests.models import Response
At the heart of this implementation is the AgentState, a custom dictionary structure that stores the agent’s current status and retry information. This ensures state persistence and control flow based on context and conditions, which is critical for handling retries and complex transitions:
# Agent State definition
class AgentState(TypedDict):
status: Literal["start", "dataingestion", "verify_count", "clear_ingestion", "dbt_bronze", "dbt_gold", "done", "failed"]
dataingestion_retry: int
dataingestion_retry_max: int
This node initiates a job to extract data. If it fails, it increments the retry counter and either retries or moves to the “failed” state.
# Execute Data Extraction Job
def run_dataingestion(state: AgentState) -> AgentState:
try:
return {**state, "status": "verify_count"}
except Exception as e:
state["dataingestion_retry"] =state["dataingestion_retry"] + 1
if state["dataingestion_retry"] < state["dataingestion_retry_max"]:
return {**state, "status": "dataingestion" }
else:
return {**state, "status": "failed" }
It compares the row counts between the source and Snowflake to ensure the data was transferred correctly. If there’s a mismatch and retries remain, it re-attempts ingestion. If retries are exhausted, it triggers a fallback clean-up step.
# Verify rowcount between Oracle and Snowflake
def verify_data_counts(state: AgentState) -> AgentState:
try:
oracle_count = 10 #example
sf_count = 10 #example
#YOUR CODE HERE
if oracle_count == sf_count:
print("Rowcounts verification: Succeeded.")
return {**state, "status": "dbt_bronze"}
else:
print("Rowcounts verification: Failed.")
if state["dataingestion_retry"] >= state["dataingestion_retry_max"]:
return {**state, "status": "failed"}
else:
return {**state, "status": "dataingestion", "dataingestion_retry": state["dataingestion_retry"] + 1}
except Exception as e:
print("Error comparando counts:", e)
return {**state, "status": "failed"}
These nodes execute dbt transformation layers. A failure in either triggers a notification and halts the pipeline.
# Execute DBT Job
def run_dbt(state: AgentState) -> AgentState:
try:
if state["status"] =='dbt_bronze':
#YOUR CODE HERE
print ("DBT Bronze: Succeeded")
#If succeeded
return {**state, "status": "dbt_gold"}
else:
#YOUR CODE HERE
print ("DBT Gold: Succeeded")
#If succeeded
return {**state, "status": "done"}
except Exception as e:
if state["status"] =='dbt_bronze':
print ("DBT Bronze: Failed")
else:
print ("DBT Gold: Failed")
return {**state, "status": "failed"}
This optional node is executed if validation fails after all retries. It allows the agent to clean the last ingestion state before restarting the pipeline.
# Clearing Data Extraction
def clear_dataingestion(state: AgentState) -> AgentState:
try:
#YOUR CODE HERE
print ("Clearing data ingestio: Succeeded")
return {**state, "status": "dataingestion"}
except Exception as e:
print ("Clearing data ingestio: Failed")
return {**state, "status": "failed"}
This function is called upon reaching any “failed” state. It’s a placeholder that could be extended to send real notifications through SMTP, Slack, etc.
# Email notifications
def email_notification():
try:
#YOUR CODE HERE
print ("Sending email to notify: Succeeded")
except Exception as e:
print ("Sending email to notify: Failed")
The next_step(state) function controls transitions between nodes using the current status in the AgentState. It prints the state and decides the next node to execute. LangGraph uses this logic to route execution paths dynamically, a major improvement over rigid DAGs.
# Transitions
def next_step(state: AgentState) -> str:
if state["status"]=='failed':
print(" Next step: failed")
print(" Data Ingestion Retries: "+str(state["dataingestion_retry"] ))
email_notification()
return "failed"
else:
print(" Next step: "+str(state["status"]))
print(" Data Ingestion Retries: "+str(state["dataingestion_retry"] ))
return state["status"]
The orchestration is assembled by defining each step as a node, and wiring transitions between them using add_conditional_edges. The graph starts from the “dataingestion” node and proceeds conditionally depending on the execution results of each node.
# Building the orchestration
orchestration = StateGraph(AgentState)
orchestration.add_node("dataingestion", RunnableLambda(run_dataingestion))
orchestration.add_node("verify_count", RunnableLambda(verify_data_counts))
orchestration.add_node("clear_ingestion", RunnableLambda(clear_dataingestion))
orchestration.add_node("dbt_bronze", RunnableLambda(run_dbt))
orchestration.add_node("dbt_gold", RunnableLambda(run_dbt))
# Graph Transitions
orchestration.set_entry_point("dataingestion")
orchestration.add_conditional_edges("dataingestion", next_step, {
"dataingestion": "dataingestion",
"verify_count": "verify_count",
"failed": END
})
orchestration.add_conditional_edges("verify_count", next_step, {
"dbt_bronze": "dbt_gold",
"failed": "clear_ingestion"
})
orchestration.add_conditional_edges("dbt_bronze", next_step, {
"dbt_gold": "dbt_gold",
"failed": END
})
orchestration.add_conditional_edges("dbt_gold", next_step, {
"dbt_gold": "dbt_gold",
"done": END,
"failed": END
})
graph = orchestration.compile()
The graph.invoke() call kicks off the workflow by supplying the initial state. The execution proceeds through the graph based on runtime logic until it reaches the END state, printing the final outcome.
# Execution of the Graph
final_state = graph.invoke({
"status": "start",
"dataingestion_retry": 0,
"dataingestion_retry_max": 3
})
print("Final state:", final_state)
Next, we test the code.
Out when all the steps in the Graph are successful:
Data ingestion
Next step: verify_count
Data Ingestion Retries: 0
Rowcounts verification: Succeeded.
Next step: dbt_bronze
Data Ingestion Retries: 0
Executing DBT Bronze
Next step: dbt_gold
Data Ingestion Retries: 0
Executing DBT Gold
Next step: done
Data Ingestion Retries: 0
Final state: {'status': 'done', 'dataingestion_retry': 0}
Output when Data Ingestion Step fails 2 times, but it continues:
Data ingestion: Failed
Next step: dataingestion
Data Ingestion Retries: 1
Data ingestion: Failed
Next step: dataingestion
Data Ingestion Retries: 2
Data ingestion: Succeeded
Next step: verify_count
Data Ingestion Retries: 2
Rowcounts verification: Succeeded.
Next step: dbt_bronze
Data Ingestion Retries: 2
DBT Bronze: Succeeded
Next step: dbt_gold
Data Ingestion Retries: 2
DBT Gold: Succeeded
Next step: done
Data Ingestion Retries: 2
Final state: {'status': 'done', 'dataingestion_retry': 2}
Output when Data Ingestion Step fails more than 3 times:
Data ingestion: Failed
Next step: dataingestion
Data Ingestion Retries: 1
Data ingestion: Failed
Next step: dataingestion
Data Ingestion Retries: 2
Data ingestion: Failed
Next step: failed
Data Ingestion Retries: 3
Final state: {'status': 'failed', 'dataingestion_retry': 3, 'dataingestion_retry_max': 3}
LangGraph marks a shift in pipeline orchestration — from rigid DAGs to intelligent, condition‑aware agents. With LLMs and LangGraph’s stateful agent design, you’re no longer bound to static execution graphs. You can build adaptive, self‑recovering workflows that respond to success, failure, or uncertainty.
While Airflow and Dagster excel at traditional scheduling, LangGraph powers a new kind of orchestration — one that thinks, adapts, and decides. Whether automating data ingestion, chaining DBT models, or running real‑time feedback loops, LangGraph lets you orchestrate with intelligence.
If you’re ready to add reasoning to your pipelines, move beyond traditional DAGs toward AI‑driven orchestration.
Deiby Gómez was the first Oracle ACE Director of Guatemala (2015-2023), he has 35 Technology certifications among them the highest certifications for Oracle Database (Oracle Certified Master, OCM) and three Advanced Snowflake Certifications (Architect, Administrator and Data Engineer). He is an engineer with two master’s degrees and a future lawyer. Deiby has been a speaker in Technology Events in more than 12 countries including Oracle Open World in San Francisco. Most recently, Deiby was selected as Snowflake Squad Member by Snowflake in 2025.
Is your organization evolving to stay competitive?
The Architect Room helps you design a digital strategy that leverages current tech investments and prepares for what’s next.
We work across all major public cloud platforms to deliver scalable, practical solutions.🔗 Visit Red Pill Analytics
📱Connect with us on Twitter, Facebook, or LinkedIn.
📚 For more data tips and insights, check out our blog on Medium