Building Long-Running TTS Pipelines with LangGraph: Orchestrating Long-Form Audio Generation
Introduction
Generating long-form audio content—audiobooks spanning hours, educational courses, or extended podcasts—presents unique challenges: API rate limits, network failures, resource constraints, and the sheer duration of processing. This article explores a production-ready architecture for long-running TTS pipelines that can gracefully handle long-form generation tasks, resume after failures, and maintain state across distributed systems.
Built with LangGraph, the system orchestrates complex workflows involving AI content generation (DeepSeek), text-to-speech conversion (OpenAI TTS), and distributed storage (Cloudflare R2). The key innovation: PostgreSQL checkpointing enables resumable execution, making it possible to generate 5-30+ minute audio segments reliably, even when individual API calls or processing steps fail.
The Challenge: Long-Form Audio at Scale
Why Long-Running Pipelines Are Hard
Traditional TTS approaches fail at scale:
- Time Constraints: A 30-minute audio narrative requires ~4,500 words, chunked into 10-15 API calls, taking 2-5 minutes to generate
- Failure Points: Each step (text generation, chunking, TTS, storage) can fail independently
- Memory Pressure: Holding all audio segments in memory for hours is impractical
- Cost Management: Retrying from scratch wastes API credits and compute time
- State Loss: Without persistence, crashes mean starting over
Our Solution: Stateful Orchestration
- LangGraph manages workflow state transitions
- PostgreSQL persists checkpoints after each successful step
- R2 provides durable storage for completed segments
- Resumable execution using
thread_idfor job recovery
System Overview
The pipeline orchestrates three main workflows:
- Research Generation: Structured content research using DeepSeek
- Narrative Text Generation: Long-form content creation with context awareness
- Audio Synthesis: Text-to-speech conversion with OpenAI TTS and Cloudflare R2 storage
Tech Stack
- LangGraph: State machine orchestration with built-in checkpointing
- DeepSeek: Long-form text generation (
deepseek-chat, 2500+ token outputs) - OpenAI TTS: Streaming audio synthesis (
gpt-4o-mini-tts, 4096 char limit per request) - PostgreSQL: Durable checkpointing for long-running jobs (Neon serverless for production)
- Cloudflare R2: S3-compatible storage with zero egress fees (critical for multi-GB audio)
- FastAPI: Async REST API for non-blocking long operations
- Docker: Containerized deployment with ffmpeg for audio merging
Why This Stack for Long-Running Jobs:
- Postgres checkpointing: Resume from any point in the workflow (text generation → chunking → TTS → upload)
- Streaming TTS: Memory-efficient direct-to-disk writes (no buffering entire audio in RAM)
- R2 durability: Segments uploaded immediately, survive process crashes
- Async execution: Non-blocking background processing for hours-long jobs
Architecture Patterns
1. Core LangGraph State Machine
The system implements three distinct LangGraph workflows, each optimized for specific tasks.
2. Research Generation Pipeline
The research pipeline generates structured research content using a focused LangGraph workflow.
Key Features:
- Low temperature (0.3) for factual accuracy
- Structured JSON output with validation
- Evidence level classification (A/B/C)
- Relevance scoring for topic matching
3. Long-Form Text Generation Pipeline
The most sophisticated workflow, supporting both full generation and audio-only modes.
Conditional Routing Logic:
def should_skip_text_generation(state: TextState) -> str:
"""Route to text generation or skip to audio."""
if state.get("existing_content") and state["existing_content"].get("text"):
return "chunk_text" # Audio-only mode
return "generate_text" # Full generation
def should_generate_audio(state: TextState) -> str:
"""Route to audio generation or end."""
if state.get("generate_audio", True):
return "chunk_text"
return END # Text-only mode
4. Audio Generation Pipeline (Standalone)
A simplified pipeline for generic long-form narration.
Iterative Chunk Processing:
The system uses a recursive edge pattern for processing chunks:
g.add_conditional_edges(
"tts_one_chunk",
edge_should_continue,
{
"tts_one_chunk": "tts_one_chunk", # Loop back
"finalize": "finalize", # Exit loop
},
)
def edge_should_continue(state: JobState) -> str:
if state["chunk_index"] < len(state["chunks"]):
return "tts_one_chunk"
return "finalize"
Deep Dive: Key Architectural Components
State Management
LangGraph uses typed state dictionaries for type safety and IDE support:
class TextState(TypedDict):
# Input metadata
content_id: int
title: str
content_type: str
language: str
target_duration_minutes: int | None
# Generation data
research_items: list[dict]
existing_content: dict | None
generated_text: str | None
# TTS fields
voice: str
chunks: List[str]
segment_urls: List[str]
manifest_url: Optional[str]
audio_url: Optional[str]
# Control flow
generate_audio: bool
database_saved: bool
error: str | None
Postgres Checkpointing: The Key to Long-Running Resilience
For long-running jobs, checkpointing is non-negotiable. Without it, a network glitch at minute 25 of a 30-minute generation means restarting from scratch.
How Checkpointing Works:
async def run_pipeline(state: TextState, thread_id: str):
db_url = os.getenv("DATABASE_URL")
async with AsyncPostgresSaver.from_conn_string(db_url) as checkpointer:
await checkpointer.setup() # Creates checkpoint tables
app = build_graph(checkpointer=checkpointer)
config = {"configurable": {"thread_id": thread_id}}
# LangGraph automatically saves state after each node execution
final_state = await app.ainvoke(state, config=config)
return final_state
What Gets Checkpointed:
- Complete state dictionary after each node
- Edge transitions and routing decisions
- Timestamps and execution metadata
- Partial results (generated text, uploaded segment URLs)
Recovery Example:
# Job crashes after generating 8 of 12 TTS segments
# Resume with same thread_id:
final_state = await run_pipeline(initial_state, thread_id="job-12345")
# LangGraph:
# 1. Loads last checkpoint from Postgres
# 2. Sees 8 segments already uploaded to R2
# 3. Continues from segment 9
# 4. Completes remaining 4 segments
Production Benefits:
- Cost Savings: No wasted API calls on retry
- Time Efficiency: Resume from 80% complete, not 0%
- Reliability: Transient failures (rate limits, timeouts) don't kill long-form jobs
- Observability: Query checkpoint table to monitor progress
- Parallel Execution: Multiple jobs with different
thread_idvalues
Text Chunking Algorithm: Optimizing for Long-Form Narration
For 30-minute audio (4,500+ words), naive chunking creates jarring transitions. Our algorithm balances API constraints with narrative flow:
Constraints:
- OpenAI TTS: 4,096 character limit per request
- Target: ~4,000 chars per chunk (safety margin)
- Goal: Natural pauses at paragraph/sentence boundaries
Strategy:
def chunk_text(text: str, max_chars: int = 4000) -> List[str]:
"""
Multi-level chunking for long-form content:
1. Split by paragraphs (\n\n) - natural topic boundaries
2. Accumulate paragraphs until approaching 4K limit
3. If single paragraph > 4K, split by sentences
4. If single sentence > 4K, split mid-sentence (rare edge case)
Result: 10-15 chunks for 30-min audio, each ending at natural pause
"""
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
chunks = []
buf = []
for p in paragraphs:
candidate = "\n\n".join(buf + [p]) if buf else p
if len(candidate) <= max_chars:
buf.append(p)
else:
if not buf:
# Paragraph too large - split by sentences
sentences = re.split(r"(?<=[.!?])\s+", p)
# Accumulate sentences with same logic...
else:
chunks.append("\n\n".join(buf))
buf = [p]
return chunks
Why This Matters for Long-Form:
- Seamless Merging: Chunk boundaries at natural pauses prevent audio glitches
- Even Distribution: Avoids tiny final chunks (better for progress tracking)
- Memory Efficiency: Process one chunk at a time, not entire 4,500-word text
- Resumability: Each chunk is independent; can resume mid-sequence
OpenAI TTS Streaming
Efficient audio generation using streaming responses:
async def node_tts_one_chunk(state: JobState) -> JobState:
chunk_text = state["chunks"][state["chunk_index"]]
segment_path = f"segment_{state['chunk_index']:04d}.mp3"
client = OpenAI()
# Stream directly to disk (memory efficient)
with client.audio.speech.with_streaming_response.create(
model="gpt-4o-mini-tts",
voice=state["voice"],
input=chunk_text,
response_format="mp3",
) as response:
response.stream_to_file(segment_path)
# Upload to R2
r2_url = upload_to_r2(segment_path, state["job_id"])
return {
**state,
"segment_urls": [*state["segment_urls"], r2_url],
"chunk_index": state["chunk_index"] + 1,
}
Audio Merging Strategy
The system uses ffmpeg for high-quality concatenation:
async def node_generate_audio(state: TextState) -> TextState:
# Generate all segments...
# Create concat list for ffmpeg
file_list_path.write_text(
"\n".join(f"file '{segment}'" for segment in segment_paths)
)
# Merge using ffmpeg (codec copy - fast and lossless)
subprocess.run([
"ffmpeg", "-f", "concat", "-safe", "0",
"-i", str(file_list_path),
"-c", "copy", # No re-encoding
str(merged_path)
])
# Fallback to binary concatenation if ffmpeg unavailable
if not merged_path.exists():
with open(merged_path, "wb") as merged:
for segment in segment_paths:
merged.write(segment.read_bytes())
Cloudflare R2 Integration
S3-compatible storage for globally distributed audio:
def get_r2_client():
return boto3.client(
's3',
endpoint_url=f'https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com',
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
config=Config(signature_version='s3v4'),
)
def upload_to_r2(file_path: Path, job_id: str) -> str:
key = f"{job_id}/{file_path.name}"
client.put_object(
Bucket=R2_BUCKET_NAME,
Key=key,
Body=file_path.read_bytes(),
ContentType='audio/mpeg',
)
return f"{R2_PUBLIC_DOMAIN}/{key}"
Structured Content Generation
Narrative Architecture Framework
The system implements a flexible content framework with customizable sections:
Key Components:
- Introduction (2-3 min): Hook the listener and set expectations
- Context: Background information and relevance
- Core Content: Main topic introduction with clear structure
- Examples: Concrete illustrations and case studies
- Deep Dive: Detailed exploration of key concepts
- Applications: Practical use cases and implementation
- Advanced Topics: Nuanced discussion for engaged learners
- Synthesis: Connect all concepts together
- Takeaways: Summary of key points
- Conclusion: Clear closing and next steps
Dynamic Content Adaptation
def build_content_prompt(state: TextState) -> str:
minutes = state.get("target_duration_minutes") or 5
target_words = int(minutes * 150) # 150 words per minute narration
content_type = state.get("content_type")
# Select architecture based on content type
architecture = generate_content_architecture(content_type)
return f"""
Create a {state['language']} narrative for audio:
TOPIC: {state['title']}
TYPE: {content_type}
TARGET: {target_words} words ({minutes} minutes)
{architecture}
RESEARCH CONTEXT:
{format_research_summary(state['research_items'])}
Requirements:
- Plain text only (no markdown)
- Natural paragraph breaks
- Engaging, clear tone
- Appropriate language for audio listening
"""
API Endpoints
FastAPI Service Layer
Endpoint Implementations:
@app.post("/api/research/generate")
async def research_endpoint(req: ResearchRequest):
"""Generate research context using LangGraph + DeepSeek."""
return await generate_research(req)
@app.post("/api/text/generate")
async def text_endpoint(req: TextGenerationRequest):
"""Generate long-form text content (text-only mode)."""
return await generate_text(req)
@app.post("/api/audio/generate")
async def audio_endpoint(req: TextGenerationRequest):
"""Generate audio from existing content (audio-only mode)."""
return await generate_audio(req)
@app.post("/api/tts/generate")
async def tts_endpoint(req: TTSRequest, background_tasks: BackgroundTasks):
"""Generic TTS generation (fire-and-forget)."""
return await generate_tts(req, background_tasks)
Deployment Architecture
Docker Containerization
FROM python:3.12-slim
WORKDIR /app
# Install ffmpeg for audio merging
RUN apt-get update && apt-get install -y ffmpeg && rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose port
EXPOSE 8080
# Run FastAPI server
CMD ["uvicorn", "langgraph_server:app", "--host", "0.0.0.0", "--port", "8080"]
Environment Configuration
# AI Services
DEEPSEEK_API_KEY=sk-...
OPENAI_API_KEY=sk-...
# Database (Neon Postgres)
DATABASE_URL=postgresql://user:pass@host/db?sslmode=require
# Cloudflare R2
R2_ACCOUNT_ID=...
R2_ACCESS_KEY_ID=...
R2_SECRET_ACCESS_KEY=...
R2_BUCKET_NAME=longform-tts
R2_PUBLIC_DOMAIN=https://pub-longform-tts.r2.dev
Cloudflare Workers Deployment
# wrangler.toml
name = "langgraph-tts"
compatibility_date = "2024-01-01"
[build]
command = "docker build -t langgraph-tts ."
[[services]]
name = "langgraph-tts"
image = "langgraph-tts:latest"
[env]
PORT = "8080"
[[r2_buckets]]
binding = "LONGFORM_TTS"
bucket_name = "longform-tts"
Production Considerations
Performance Metrics for Long-Running Jobs
Benchmarks (30-minute audio generation):
| Stage | Duration | Checkpointed | Retryable |
|---|---|---|---|
| Text Generation (DeepSeek) | 30-60s | ✅ After completion | ✅ Full retry |
| Text Chunking | <1s | ✅ After completion | ✅ Instant |
| TTS Segments (1–12) | 10-20s each | ✅ After each segment | ✅ Per-segment |
| Audio Merging (ffmpeg) | 1–3s | ✅ After completion | ✅ Full retry |
| R2 Upload (merged) | 2-5s | ✅ After completion | ✅ Full retry |
| Total Pipeline | 3-5 minutes | 15+ checkpoints | Granular recovery |
Long-Running Job Profile:
# Example: 2-hour audiobook chapter
text_length = 18,000 words
chunks = 45 # ~4,000 chars each
tts_time = 45 * 15s = 11.25 minutes
text_gen_time = 2-3 minutes
total_time = ~15 minutes for 2-hour audio
# Checkpoint frequency:
# - 1 after text generation
# - 45 after each TTS segment
# - 1 after merge
# Total: 47 recovery points
Failure Recovery Times:
- Crash at 80% complete → Resume in 1–2 seconds, continue from segment 36/45
- Network timeout on segment 20 → Retry only segment 20, not segments 1–19
- Database connection loss → Reconnect and load last checkpoint (<500ms)
Error Handling & Resilience
async def node_generate_text(state: TextState) -> TextState:
try:
llm = ChatDeepSeek(model="deepseek-chat", temperature=0.7, max_tokens=2500)
prompt = build_therapeutic_prompt(state)
resp = await llm.ainvoke([HumanMessage(content=prompt)])
text = clean_for_tts(resp.content)
return {**state, "generated_text": text, "error": None}
except Exception as e:
print(f"❌ Text generation failed: {e}")
return {**state, "error": str(e)}
Monitoring & Observability
Key metrics to track:
-
Generation Metrics:
- Text generation latency (DeepSeek)
- TTS latency per chunk (OpenAI)
- Total pipeline duration
-
Quality Metrics:
- Text length vs target duration
- Chunk count and size distribution
- Audio segment file sizes
-
Infrastructure Metrics:
- R2 upload success rate
- Database checkpoint writes
- ffmpeg merge success rate
-
Cost Metrics:
- DeepSeek token usage
- OpenAI TTS character count
- R2 storage and bandwidth
Scaling Patterns
Horizontal Scaling:
- FastAPI instances behind load balancer
- Stateless design (state in Postgres)
- R2 for distributed storage
Batch Processing:
async def batch_generate_audio(goal_ids: List[int]):
"""Process multiple goals in parallel."""
tasks = [run_pipeline(build_state(id), f"batch-{id}") for id in goal_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Queue-Based Processing:
- Use background tasks for long-running jobs
- Celery/Redis for distributed task queue
- Webhook callbacks for completion notifications
Performance Optimization
Chunking Optimization
# Optimize chunk size for TTS quality vs API limits
OPTIMAL_CHUNK_SIZE = 3000 # Sweet spot for natural pauses
# Parallel TTS generation (with rate limiting)
async def parallel_tts_generation(chunks: List[str], max_concurrent: int = 3):
semaphore = asyncio.Semaphore(max_concurrent)
async def generate_with_limit(chunk, index):
async with semaphore:
return await generate_tts_segment(chunk, index)
tasks = [generate_with_limit(c, i) for i, c in enumerate(chunks)]
return await asyncio.gather(*tasks)
Caching Strategy
# Cache research results for similar goals
@lru_cache(maxsize=100)
def get_research_for_goal_type(therapeutic_type: str, age: int):
"""Cache research by type + age bracket."""
return fetch_research(therapeutic_type, age)
# Cache text generation for re-use
async def get_or_generate_text(goal_id: int):
existing = await db.fetch_story(goal_id)
if existing and existing.created_at > datetime.now() - timedelta(days=7):
return existing.text
return await generate_new_text(goal_id)
Testing Strategy
Unit Tests
def test_chunk_text_respects_limit():
long_text = "word " * 2000
chunks = chunk_text(long_text, max_chars=4000)
for chunk in chunks:
assert len(chunk) <= 4000
def test_clean_for_tts_removes_markdown():
text = "# Title\n\n**bold** and `code`"
cleaned = clean_for_tts(text)
assert "#" not in cleaned
assert "**" not in cleaned
assert "`" not in cleaned
Integration Tests
@pytest.mark.asyncio
async def test_full_pipeline():
state = {
"goal_id": 1,
"goal_title": "Test anxiety reduction",
"therapeutic_goal_type": "anxiety_reduction",
"age": 8,
# ... other fields
}
result = await run_pipeline(state, "test-thread-1")
assert result["generated_text"] is not None
assert len(result["chunks"]) > 0
assert result["audio_url"] is not None
assert result["error"] is None
Lessons Learned
1. State Design is Critical
- Use TypedDict for type safety
- Keep state flat (avoid deep nesting)
- Include metadata for debugging (timestamps, IDs)
2. Checkpoint Strategically
- Not all workflows need checkpointing
- Audio-only mode: disable checkpoints to avoid schema issues
- Use thread_id conventions:
{workflow}-{entity_id}-{timestamp}
3. Error Recovery
- Graceful degradation (segments work even if merge fails)
- Fallback strategies (binary concat if ffmpeg unavailable)
- Preserve partial results (individual segments in R2)
4. Cost Management
- Monitor token usage (DeepSeek is cost-effective at 0.28 per 1M tokens)
- OpenAI TTS: $15 per 1M characters
- R2 storage: $0.015/GB/month (much cheaper than S3)
5. Content Quality
- Structured frameworks improve consistency
- Repetition aids retention and comprehension
- Audience-appropriate language is crucial for engagement
Future Enhancements
1. Multi-Voice Narratives
# Support character dialogue with different voices
voices = {
"narrator": "cedar",
"child_character": "nova",
"parent_character": "marin"
}
2. Emotion-Adaptive TTS
# Adjust voice parameters based on content emotion
def get_tts_params(text: str) -> dict:
sentiment = analyze_sentiment(text)
if sentiment == "calm":
return {"speed": 0.9, "pitch": 0}
elif sentiment == "energetic":
return {"speed": 1.1, "pitch": 2}
3. Real-Time Streaming
# Stream audio as it's generated (SSE)
async def stream_audio_generation(goal_id: int):
async for chunk_url in generate_audio_stream(goal_id):
yield f"data: {json.dumps({'chunk_url': chunk_url})}\n\n"
4. Multilingual Support
- Expand beyond Romanian and English
- Voice selection per language
- Cultural adaptation of content frameworks
Conclusion
This LangGraph-based TTS architecture demonstrates several key patterns:
- Composable Workflows: Three distinct pipelines sharing common components
- Conditional Routing: Smart flow control based on state
- Durable Execution: PostgreSQL checkpointing for resilience
- Streaming Efficiency: Direct-to-disk TTS for memory optimization
- Distributed Storage: R2 for globally accessible audio
The system successfully processes 5-30+ minute long-form narratives (up to 7,000+ words), generating research-backed content, converting to high-quality audio, and delivering via CDN—all while maintaining resumability after failures and full observability.
Real-World Performance:
- 30-minute generation: 12-15 TTS chunks, ~3-5 minutes total processing time
- Failure recovery: Resume from any checkpoint in <1 second
- Cost efficiency: 0.07 per 30-minute audio (DeepSeek + OpenAI TTS)
- Throughput: 10+ concurrent jobs on single instance
Key Takeaways for Long-Running Pipelines:
- LangGraph + Postgres checkpointing is essential for long-form workflows
- Streaming TTS to disk prevents memory exhaustion on long generations
- Smart chunking (4K chars) balances API limits with narrative coherence
- Immediate R2 uploads ensure partial results survive crashes
- Async architecture enables fire-and-forget long operations
- Thread-based recovery makes interrupted jobs trivial to resume
The architecture scales to long-form audio generation: audiobooks (10+ hours), comprehensive courses, documentary narration, or serialized storytelling—any use case where reliability and resumability are non-negotiable.
References
This architecture powers long-form audio generation, combining LangGraph orchestration, OpenAI TTS streaming, and distributed storage for production-ready AI audio systems.
