Skip to main content

10 posts tagged with "python"

View All Tags

Building Long-Running TTS Pipelines with LangGraph: Orchestrating Long-Form Audio Generation

· 17 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

Generating long-form audio content—audiobooks spanning hours, educational courses, or extended podcasts—presents unique challenges: API rate limits, network failures, resource constraints, and the sheer duration of processing. This article explores a production-ready architecture for long-running TTS pipelines that can gracefully handle long-form generation tasks, resume after failures, and maintain state across distributed systems.

Built with LangGraph, the system orchestrates complex workflows involving AI content generation (DeepSeek), text-to-speech conversion (OpenAI TTS), and distributed storage (Cloudflare R2). The key innovation: PostgreSQL checkpointing enables resumable execution, making it possible to generate 5-30+ minute audio segments reliably, even when individual API calls or processing steps fail.

The Challenge: Long-Form Audio at Scale

Why Long-Running Pipelines Are Hard

Traditional TTS approaches fail at scale:

  1. Time Constraints: A 30-minute audio narrative requires ~4,500 words, chunked into 10-15 API calls, taking 2-5 minutes to generate
  2. Failure Points: Each step (text generation, chunking, TTS, storage) can fail independently
  3. Memory Pressure: Holding all audio segments in memory for hours is impractical
  4. Cost Management: Retrying from scratch wastes API credits and compute time
  5. State Loss: Without persistence, crashes mean starting over

Our Solution: Stateful Orchestration

  • LangGraph manages workflow state transitions
  • PostgreSQL persists checkpoints after each successful step
  • R2 provides durable storage for completed segments
  • Resumable execution using thread_id for job recovery

System Overview

The pipeline orchestrates three main workflows:

  1. Research Generation: Structured content research using DeepSeek
  2. Narrative Text Generation: Long-form content creation with context awareness
  3. Audio Synthesis: Text-to-speech conversion with OpenAI TTS and Cloudflare R2 storage

Tech Stack

  • LangGraph: State machine orchestration with built-in checkpointing
  • DeepSeek: Long-form text generation (deepseek-chat, 2500+ token outputs)
  • OpenAI TTS: Streaming audio synthesis (gpt-4o-mini-tts, 4096 char limit per request)
  • PostgreSQL: Durable checkpointing for long-running jobs (Neon serverless for production)
  • Cloudflare R2: S3-compatible storage with zero egress fees (critical for multi-GB audio)
  • FastAPI: Async REST API for non-blocking long operations
  • Docker: Containerized deployment with ffmpeg for audio merging

Why This Stack for Long-Running Jobs:

  • Postgres checkpointing: Resume from any point in the workflow (text generation → chunking → TTS → upload)
  • Streaming TTS: Memory-efficient direct-to-disk writes (no buffering entire audio in RAM)
  • R2 durability: Segments uploaded immediately, survive process crashes
  • Async execution: Non-blocking background processing for hours-long jobs

Architecture Patterns

1. Core LangGraph State Machine

The system implements three distinct LangGraph workflows, each optimized for specific tasks.

2. Research Generation Pipeline

The research pipeline generates structured research content using a focused LangGraph workflow.

Key Features:

  • Low temperature (0.3) for factual accuracy
  • Structured JSON output with validation
  • Evidence level classification (A/B/C)
  • Relevance scoring for topic matching

3. Long-Form Text Generation Pipeline

The most sophisticated workflow, supporting both full generation and audio-only modes.

Conditional Routing Logic:

def should_skip_text_generation(state: TextState) -> str:
"""Route to text generation or skip to audio."""
if state.get("existing_content") and state["existing_content"].get("text"):
return "chunk_text" # Audio-only mode
return "generate_text" # Full generation

def should_generate_audio(state: TextState) -> str:
"""Route to audio generation or end."""
if state.get("generate_audio", True):
return "chunk_text"
return END # Text-only mode

4. Audio Generation Pipeline (Standalone)

A simplified pipeline for generic long-form narration.

Iterative Chunk Processing:

The system uses a recursive edge pattern for processing chunks:

g.add_conditional_edges(
"tts_one_chunk",
edge_should_continue,
{
"tts_one_chunk": "tts_one_chunk", # Loop back
"finalize": "finalize", # Exit loop
},
)

def edge_should_continue(state: JobState) -> str:
if state["chunk_index"] < len(state["chunks"]):
return "tts_one_chunk"
return "finalize"

Deep Dive: Key Architectural Components

State Management

LangGraph uses typed state dictionaries for type safety and IDE support:

class TextState(TypedDict):
# Input metadata
content_id: int
title: str
content_type: str
language: str
target_duration_minutes: int | None

# Generation data
research_items: list[dict]
existing_content: dict | None
generated_text: str | None

# TTS fields
voice: str
chunks: List[str]
segment_urls: List[str]
manifest_url: Optional[str]
audio_url: Optional[str]

# Control flow
generate_audio: bool
database_saved: bool
error: str | None

Postgres Checkpointing: The Key to Long-Running Resilience

For long-running jobs, checkpointing is non-negotiable. Without it, a network glitch at minute 25 of a 30-minute generation means restarting from scratch.

How Checkpointing Works:

async def run_pipeline(state: TextState, thread_id: str):
db_url = os.getenv("DATABASE_URL")

async with AsyncPostgresSaver.from_conn_string(db_url) as checkpointer:
await checkpointer.setup() # Creates checkpoint tables
app = build_graph(checkpointer=checkpointer)
config = {"configurable": {"thread_id": thread_id}}

# LangGraph automatically saves state after each node execution
final_state = await app.ainvoke(state, config=config)
return final_state

What Gets Checkpointed:

  • Complete state dictionary after each node
  • Edge transitions and routing decisions
  • Timestamps and execution metadata
  • Partial results (generated text, uploaded segment URLs)

Recovery Example:

# Job crashes after generating 8 of 12 TTS segments
# Resume with same thread_id:
final_state = await run_pipeline(initial_state, thread_id="job-12345")

# LangGraph:
# 1. Loads last checkpoint from Postgres
# 2. Sees 8 segments already uploaded to R2
# 3. Continues from segment 9
# 4. Completes remaining 4 segments

Production Benefits:

  • Cost Savings: No wasted API calls on retry
  • Time Efficiency: Resume from 80% complete, not 0%
  • Reliability: Transient failures (rate limits, timeouts) don't kill long-form jobs
  • Observability: Query checkpoint table to monitor progress
  • Parallel Execution: Multiple jobs with different thread_id values

Text Chunking Algorithm: Optimizing for Long-Form Narration

For 30-minute audio (4,500+ words), naive chunking creates jarring transitions. Our algorithm balances API constraints with narrative flow:

Constraints:

  • OpenAI TTS: 4,096 character limit per request
  • Target: ~4,000 chars per chunk (safety margin)
  • Goal: Natural pauses at paragraph/sentence boundaries

Strategy:

def chunk_text(text: str, max_chars: int = 4000) -> List[str]:
"""
Multi-level chunking for long-form content:

1. Split by paragraphs (\n\n) - natural topic boundaries
2. Accumulate paragraphs until approaching 4K limit
3. If single paragraph > 4K, split by sentences
4. If single sentence > 4K, split mid-sentence (rare edge case)

Result: 10-15 chunks for 30-min audio, each ending at natural pause
"""
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
chunks = []
buf = []

for p in paragraphs:
candidate = "\n\n".join(buf + [p]) if buf else p
if len(candidate) <= max_chars:
buf.append(p)
else:
if not buf:
# Paragraph too large - split by sentences
sentences = re.split(r"(?<=[.!?])\s+", p)
# Accumulate sentences with same logic...
else:
chunks.append("\n\n".join(buf))
buf = [p]

return chunks

Why This Matters for Long-Form:

  • Seamless Merging: Chunk boundaries at natural pauses prevent audio glitches
  • Even Distribution: Avoids tiny final chunks (better for progress tracking)
  • Memory Efficiency: Process one chunk at a time, not entire 4,500-word text
  • Resumability: Each chunk is independent; can resume mid-sequence

OpenAI TTS Streaming

Efficient audio generation using streaming responses:

async def node_tts_one_chunk(state: JobState) -> JobState:
chunk_text = state["chunks"][state["chunk_index"]]
segment_path = f"segment_{state['chunk_index']:04d}.mp3"

client = OpenAI()

# Stream directly to disk (memory efficient)
with client.audio.speech.with_streaming_response.create(
model="gpt-4o-mini-tts",
voice=state["voice"],
input=chunk_text,
response_format="mp3",
) as response:
response.stream_to_file(segment_path)

# Upload to R2
r2_url = upload_to_r2(segment_path, state["job_id"])

return {
**state,
"segment_urls": [*state["segment_urls"], r2_url],
"chunk_index": state["chunk_index"] + 1,
}

Audio Merging Strategy

The system uses ffmpeg for high-quality concatenation:

async def node_generate_audio(state: TextState) -> TextState:
# Generate all segments...

# Create concat list for ffmpeg
file_list_path.write_text(
"\n".join(f"file '{segment}'" for segment in segment_paths)
)

# Merge using ffmpeg (codec copy - fast and lossless)
subprocess.run([
"ffmpeg", "-f", "concat", "-safe", "0",
"-i", str(file_list_path),
"-c", "copy", # No re-encoding
str(merged_path)
])

# Fallback to binary concatenation if ffmpeg unavailable
if not merged_path.exists():
with open(merged_path, "wb") as merged:
for segment in segment_paths:
merged.write(segment.read_bytes())

Cloudflare R2 Integration

S3-compatible storage for globally distributed audio:

def get_r2_client():
return boto3.client(
's3',
endpoint_url=f'https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com',
aws_access_key_id=R2_ACCESS_KEY_ID,
aws_secret_access_key=R2_SECRET_ACCESS_KEY,
config=Config(signature_version='s3v4'),
)

def upload_to_r2(file_path: Path, job_id: str) -> str:
key = f"{job_id}/{file_path.name}"

client.put_object(
Bucket=R2_BUCKET_NAME,
Key=key,
Body=file_path.read_bytes(),
ContentType='audio/mpeg',
)

return f"{R2_PUBLIC_DOMAIN}/{key}"

Structured Content Generation

Narrative Architecture Framework

The system implements a flexible content framework with customizable sections:

Key Components:

  1. Introduction (2-3 min): Hook the listener and set expectations
  2. Context: Background information and relevance
  3. Core Content: Main topic introduction with clear structure
  4. Examples: Concrete illustrations and case studies
  5. Deep Dive: Detailed exploration of key concepts
  6. Applications: Practical use cases and implementation
  7. Advanced Topics: Nuanced discussion for engaged learners
  8. Synthesis: Connect all concepts together
  9. Takeaways: Summary of key points
  10. Conclusion: Clear closing and next steps

Dynamic Content Adaptation

def build_content_prompt(state: TextState) -> str:
minutes = state.get("target_duration_minutes") or 5
target_words = int(minutes * 150) # 150 words per minute narration

content_type = state.get("content_type")

# Select architecture based on content type
architecture = generate_content_architecture(content_type)

return f"""
Create a {state['language']} narrative for audio:

TOPIC: {state['title']}
TYPE: {content_type}
TARGET: {target_words} words ({minutes} minutes)

{architecture}

RESEARCH CONTEXT:
{format_research_summary(state['research_items'])}

Requirements:
- Plain text only (no markdown)
- Natural paragraph breaks
- Engaging, clear tone
- Appropriate language for audio listening
"""

API Endpoints

FastAPI Service Layer

Endpoint Implementations:

@app.post("/api/research/generate")
async def research_endpoint(req: ResearchRequest):
"""Generate research context using LangGraph + DeepSeek."""
return await generate_research(req)

@app.post("/api/text/generate")
async def text_endpoint(req: TextGenerationRequest):
"""Generate long-form text content (text-only mode)."""
return await generate_text(req)

@app.post("/api/audio/generate")
async def audio_endpoint(req: TextGenerationRequest):
"""Generate audio from existing content (audio-only mode)."""
return await generate_audio(req)

@app.post("/api/tts/generate")
async def tts_endpoint(req: TTSRequest, background_tasks: BackgroundTasks):
"""Generic TTS generation (fire-and-forget)."""
return await generate_tts(req, background_tasks)

Deployment Architecture

Docker Containerization

FROM python:3.12-slim

WORKDIR /app

# Install ffmpeg for audio merging
RUN apt-get update && apt-get install -y ffmpeg && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Expose port
EXPOSE 8080

# Run FastAPI server
CMD ["uvicorn", "langgraph_server:app", "--host", "0.0.0.0", "--port", "8080"]

Environment Configuration

# AI Services
DEEPSEEK_API_KEY=sk-...
OPENAI_API_KEY=sk-...

# Database (Neon Postgres)
DATABASE_URL=postgresql://user:pass@host/db?sslmode=require

# Cloudflare R2
R2_ACCOUNT_ID=...
R2_ACCESS_KEY_ID=...
R2_SECRET_ACCESS_KEY=...
R2_BUCKET_NAME=longform-tts
R2_PUBLIC_DOMAIN=https://pub-longform-tts.r2.dev

Cloudflare Workers Deployment

# wrangler.toml
name = "langgraph-tts"
compatibility_date = "2024-01-01"

[build]
command = "docker build -t langgraph-tts ."

[[services]]
name = "langgraph-tts"
image = "langgraph-tts:latest"

[env]
PORT = "8080"

[[r2_buckets]]
binding = "LONGFORM_TTS"
bucket_name = "longform-tts"

Production Considerations

Performance Metrics for Long-Running Jobs

Benchmarks (30-minute audio generation):

StageDurationCheckpointedRetryable
Text Generation (DeepSeek)30-60s✅ After completion✅ Full retry
Text Chunking<1s✅ After completion✅ Instant
TTS Segments (1–12)10-20s each✅ After each segment✅ Per-segment
Audio Merging (ffmpeg)1–3s✅ After completion✅ Full retry
R2 Upload (merged)2-5s✅ After completion✅ Full retry
Total Pipeline3-5 minutes15+ checkpointsGranular recovery

Long-Running Job Profile:

# Example: 2-hour audiobook chapter
text_length = 18,000 words
chunks = 45 # ~4,000 chars each
tts_time = 45 * 15s = 11.25 minutes
text_gen_time = 2-3 minutes
total_time = ~15 minutes for 2-hour audio

# Checkpoint frequency:
# - 1 after text generation
# - 45 after each TTS segment
# - 1 after merge
# Total: 47 recovery points

Failure Recovery Times:

  • Crash at 80% complete → Resume in 1–2 seconds, continue from segment 36/45
  • Network timeout on segment 20 → Retry only segment 20, not segments 1–19
  • Database connection loss → Reconnect and load last checkpoint (<500ms)

Error Handling & Resilience

async def node_generate_text(state: TextState) -> TextState:
try:
llm = ChatDeepSeek(model="deepseek-chat", temperature=0.7, max_tokens=2500)
prompt = build_therapeutic_prompt(state)

resp = await llm.ainvoke([HumanMessage(content=prompt)])
text = clean_for_tts(resp.content)

return {**state, "generated_text": text, "error": None}
except Exception as e:
print(f"❌ Text generation failed: {e}")
return {**state, "error": str(e)}

Monitoring & Observability

Key metrics to track:

  1. Generation Metrics:

    • Text generation latency (DeepSeek)
    • TTS latency per chunk (OpenAI)
    • Total pipeline duration
  2. Quality Metrics:

    • Text length vs target duration
    • Chunk count and size distribution
    • Audio segment file sizes
  3. Infrastructure Metrics:

    • R2 upload success rate
    • Database checkpoint writes
    • ffmpeg merge success rate
  4. Cost Metrics:

    • DeepSeek token usage
    • OpenAI TTS character count
    • R2 storage and bandwidth

Scaling Patterns

Horizontal Scaling:

  • FastAPI instances behind load balancer
  • Stateless design (state in Postgres)
  • R2 for distributed storage

Batch Processing:

async def batch_generate_audio(goal_ids: List[int]):
"""Process multiple goals in parallel."""
tasks = [run_pipeline(build_state(id), f"batch-{id}") for id in goal_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results

Queue-Based Processing:

  • Use background tasks for long-running jobs
  • Celery/Redis for distributed task queue
  • Webhook callbacks for completion notifications

Performance Optimization

Chunking Optimization

# Optimize chunk size for TTS quality vs API limits
OPTIMAL_CHUNK_SIZE = 3000 # Sweet spot for natural pauses

# Parallel TTS generation (with rate limiting)
async def parallel_tts_generation(chunks: List[str], max_concurrent: int = 3):
semaphore = asyncio.Semaphore(max_concurrent)

async def generate_with_limit(chunk, index):
async with semaphore:
return await generate_tts_segment(chunk, index)

tasks = [generate_with_limit(c, i) for i, c in enumerate(chunks)]
return await asyncio.gather(*tasks)

Caching Strategy

# Cache research results for similar goals
@lru_cache(maxsize=100)
def get_research_for_goal_type(therapeutic_type: str, age: int):
"""Cache research by type + age bracket."""
return fetch_research(therapeutic_type, age)

# Cache text generation for re-use
async def get_or_generate_text(goal_id: int):
existing = await db.fetch_story(goal_id)
if existing and existing.created_at > datetime.now() - timedelta(days=7):
return existing.text
return await generate_new_text(goal_id)

Testing Strategy

Unit Tests

def test_chunk_text_respects_limit():
long_text = "word " * 2000
chunks = chunk_text(long_text, max_chars=4000)

for chunk in chunks:
assert len(chunk) <= 4000

def test_clean_for_tts_removes_markdown():
text = "# Title\n\n**bold** and `code`"
cleaned = clean_for_tts(text)
assert "#" not in cleaned
assert "**" not in cleaned
assert "`" not in cleaned

Integration Tests

@pytest.mark.asyncio
async def test_full_pipeline():
state = {
"goal_id": 1,
"goal_title": "Test anxiety reduction",
"therapeutic_goal_type": "anxiety_reduction",
"age": 8,
# ... other fields
}

result = await run_pipeline(state, "test-thread-1")

assert result["generated_text"] is not None
assert len(result["chunks"]) > 0
assert result["audio_url"] is not None
assert result["error"] is None

Lessons Learned

1. State Design is Critical

  • Use TypedDict for type safety
  • Keep state flat (avoid deep nesting)
  • Include metadata for debugging (timestamps, IDs)

2. Checkpoint Strategically

  • Not all workflows need checkpointing
  • Audio-only mode: disable checkpoints to avoid schema issues
  • Use thread_id conventions: {workflow}-{entity_id}-{timestamp}

3. Error Recovery

  • Graceful degradation (segments work even if merge fails)
  • Fallback strategies (binary concat if ffmpeg unavailable)
  • Preserve partial results (individual segments in R2)

4. Cost Management

  • Monitor token usage (DeepSeek is cost-effective at 0.14/0.14/0.28 per 1M tokens)
  • OpenAI TTS: $15 per 1M characters
  • R2 storage: $0.015/GB/month (much cheaper than S3)

5. Content Quality

  • Structured frameworks improve consistency
  • Repetition aids retention and comprehension
  • Audience-appropriate language is crucial for engagement

Future Enhancements

1. Multi-Voice Narratives

# Support character dialogue with different voices
voices = {
"narrator": "cedar",
"child_character": "nova",
"parent_character": "marin"
}

2. Emotion-Adaptive TTS

# Adjust voice parameters based on content emotion
def get_tts_params(text: str) -> dict:
sentiment = analyze_sentiment(text)

if sentiment == "calm":
return {"speed": 0.9, "pitch": 0}
elif sentiment == "energetic":
return {"speed": 1.1, "pitch": 2}

3. Real-Time Streaming

# Stream audio as it's generated (SSE)
async def stream_audio_generation(goal_id: int):
async for chunk_url in generate_audio_stream(goal_id):
yield f"data: {json.dumps({'chunk_url': chunk_url})}\n\n"

4. Multilingual Support

  • Expand beyond Romanian and English
  • Voice selection per language
  • Cultural adaptation of content frameworks

Conclusion

This LangGraph-based TTS architecture demonstrates several key patterns:

  1. Composable Workflows: Three distinct pipelines sharing common components
  2. Conditional Routing: Smart flow control based on state
  3. Durable Execution: PostgreSQL checkpointing for resilience
  4. Streaming Efficiency: Direct-to-disk TTS for memory optimization
  5. Distributed Storage: R2 for globally accessible audio

The system successfully processes 5-30+ minute long-form narratives (up to 7,000+ words), generating research-backed content, converting to high-quality audio, and delivering via CDN—all while maintaining resumability after failures and full observability.

Real-World Performance:

  • 30-minute generation: 12-15 TTS chunks, ~3-5 minutes total processing time
  • Failure recovery: Resume from any checkpoint in <1 second
  • Cost efficiency: 0.020.02-0.07 per 30-minute audio (DeepSeek + OpenAI TTS)
  • Throughput: 10+ concurrent jobs on single instance

Key Takeaways for Long-Running Pipelines:

  • LangGraph + Postgres checkpointing is essential for long-form workflows
  • Streaming TTS to disk prevents memory exhaustion on long generations
  • Smart chunking (4K chars) balances API limits with narrative coherence
  • Immediate R2 uploads ensure partial results survive crashes
  • Async architecture enables fire-and-forget long operations
  • Thread-based recovery makes interrupted jobs trivial to resume

The architecture scales to long-form audio generation: audiobooks (10+ hours), comprehensive courses, documentary narration, or serialized storytelling—any use case where reliability and resumability are non-negotiable.

References


This architecture powers long-form audio generation, combining LangGraph orchestration, OpenAI TTS streaming, and distributed storage for production-ready AI audio systems.

How to Integrate OpenAI TTS with FFmpeg in a FastAPI Service

· 5 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

OpenAI offers powerful text-to-speech capabilities, enabling developers to generate spoken audio from raw text. Meanwhile, FFmpeg is the de facto standard tool for audio/video processing—used heavily for tasks like merging audio files, converting formats, and applying filters. Combining these two in a FastAPI application can produce a scalable, production-ready text-to-speech (TTS) workflow that merges and manipulates audio via FFmpeg under the hood.

This article demonstrates how to:

  1. Accept text input through a FastAPI endpoint
  2. Chunk text and use OpenAI to generate MP3 segments
  3. Merge generated segments with FFmpeg (through the pydub interface)
  4. Return or store a final MP3 file, ideal for streamlined TTS pipelines

By the end, you’ll understand how to build a simple but effective text-to-speech microservice that leverages the power of OpenAI and FFmpeg.


1. Why Combine OpenAI and FFmpeg

  • Chunked Processing: Long text might exceed certain API limits or timeouts. Splitting into smaller parts ensures each piece is handled reliably.
  • Post-processing: Merging segments, adding intros or outros, or applying custom filters (such as volume adjustments) becomes trivial with FFmpeg.
  • Scalability: A background task system (like FastAPI’s BackgroundTasks) can handle requests without blocking the main thread.
  • Automation: Minimizes manual involvement—one endpoint can receive text and produce a final merged MP3.

2. FastAPI Endpoint and Background Tasks

Below is the FastAPI code that implements a TTS service using the OpenAI API and pydub (which uses FFmpeg internally). It splits the input text into manageable chunks, generates MP3 files per chunk, then merges them:

import os
import time
import logging
from pathlib import Path

from dotenv import load_dotenv
from fastapi import APIRouter, HTTPException, Request, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from openai import OpenAI
from pydub import AudioSegment

load_dotenv(".env.local")

OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
client = OpenAI(api_key=OPENAI_API_KEY)

router = APIRouter()

logging.basicConfig(
level=logging.DEBUG, # Set root logger to debug level
format='%(levelname)s | %(name)s | %(message)s'
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

class AudioRequest(BaseModel):
input: str

def chunk_text(text: str, chunk_size: int = 4096):
"""
Generator that yields `text` in chunks of `chunk_size`.
"""
for i in range(0, len(text), chunk_size):
yield text[i:i + chunk_size]

@router.post("/speech")
async def generate_speech(request: Request, body: AudioRequest, background_tasks: BackgroundTasks):
"""
Fires off the TTS request in the background (fire-and-forget).
Logs are added to track progress. No zip file is created.
"""
model = "tts-1"
voice = "onyx"

if not body.input:
raise HTTPException(
status_code=400,
detail="Missing required field: input"
)

# Current time for folder naming or logging
timestamp = int(time.time() * 1000)

# Create a folder for storing output
output_folder = Path(".") / f"speech_{timestamp}"
output_folder.mkdir(exist_ok=True)

# Split the input into chunks
chunks = list(chunk_text(body.input, 4096))

# Schedule the actual speech generation in the background
background_tasks.add_task(
generate_audio_files,
chunks=chunks,
output_folder=output_folder,
model=model,
voice=voice,
timestamp=timestamp
)

# Log and return immediately
logger.info(f"Speech generation task started at {timestamp} with {len(chunks)} chunks.")
return JSONResponse({"detail": f"Speech generation started. Timestamp: {timestamp}"})

def generate_audio_files(chunks, output_folder, model, voice, timestamp):
"""
Generates audio files for each chunk. Runs in the background.
After all chunks are created, merges them into a single MP3 file.
"""
try:
# Generate individual chunk MP3s
for index, chunk in enumerate(chunks):
speech_filename = f"speech-chunk-{index + 1}.mp3"
speech_file_path = output_folder / speech_filename

logger.info(f"Generating audio for chunk {index + 1}/{len(chunks)}...")

response = client.audio.speech.create(
model=model,
voice=voice,
input=chunk,
response_format="mp3",
)

response.stream_to_file(speech_file_path)
logger.info(f"Chunk {index + 1} audio saved to {speech_file_path}")

# Merge all generated MP3 files into a single file
logger.info("Merging all audio chunks into one file...")
merged_audio = AudioSegment.empty()

def file_index(file_path: Path):
# Expects file names like 'speech-chunk-1.mp3'
return int(file_path.stem.split('-')[-1])

sorted_audio_files = sorted(output_folder.glob("speech-chunk-*.mp3"), key=file_index)
for audio_file in sorted_audio_files:
chunk_audio = AudioSegment.from_file(audio_file, format="mp3")
merged_audio += chunk_audio

merged_output_file = output_folder / f"speech-merged-{timestamp}.mp3"
merged_audio.export(merged_output_file, format="mp3")
logger.info(f"Merged audio saved to {merged_output_file}")

logger.info(f"All speech chunks generated and merged for timestamp {timestamp}.")
except Exception as e:
logger.error(f"OpenAI error (timestamp {timestamp}): {e}")

Key Takeaways

  • AudioRequest model enforces the presence of an input field.
  • chunk_text ensures no chunk exceeds 4096 characters (you can adjust this size).
  • BackgroundTasks offloads the TTS generation so the API can respond promptly.
  • pydub merges MP3 files (which in turn calls FFmpeg).

3. Using FFmpeg Under the Hood

Installing pydub requires FFmpeg on your system. Ensure FFmpeg is in your PATH—otherwise you’ll get errors when merging or saving MP3 files. For Linux (Ubuntu/Debian):

sudo apt-get update
sudo apt-get install ffmpeg

For macOS (using Homebrew):

brew install ffmpeg

If you’re on Windows, install FFmpeg from FFmpeg’s official site or use a package manager like chocolatey or scoop.


4. Mermaid JS Diagram

Below is a Mermaid sequence diagram illustrating the workflow:

Explanation:

  1. User sends a POST request with text data.
  2. FastAPI quickly acknowledges the request, then spawns a background task.
  3. Chunks of text are processed via OpenAI TTS, saving individual MP3 files.
  4. pydub merges them (calling FFmpeg behind the scenes).
  5. Final merged file is ready in your output directory.

5. Conclusion

Integrating OpenAI text-to-speech with FFmpeg via pydub in a FastAPI application provides a robust, scalable way to automate TTS pipelines:

  • Reliability: Chunk-based processing handles large inputs without overloading the API.
  • Versatility: FFmpeg’s audio manipulation potential is nearly limitless.
  • Speed: Background tasks ensure the main API remains responsive.

With the sample code above, you can adapt chunk sizes, add authentication, or expand the pipeline to include more sophisticated post-processing (like watermarking, crossfading, or mixing in music). Enjoy building richer audio capabilities into your apps—OpenAI and FFmpeg make a powerful duo.

How to Set Up and Run DeepSeek-R1 Locally With Ollama and FastAPI

· 5 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

DeepSeek-R1 is a family of large language models (LLMs) known for advanced natural language capabilities. While hosting an LLM in the cloud can be convenient, local deployment provides greater control over latency, privacy, and resource utilization. Tools like Ollama simplify this process by handling model downloading and quantization. However, to truly scale or integrate these capabilities into other services, you often need a robust REST API layer—FastAPI is perfect for this.

This article covers the entire pipeline:

  1. Installing and configuring Ollama to serve DeepSeek-R1 locally
  2. Interacting with DeepSeek-R1 using the CLI, Python scripts, or a FastAPI endpoint for streaming responses
  3. Demonstrating a minimal FastAPI integration, so you can easily wrap your model in a web service

By the end, you’ll see how to run DeepSeek-R1 locally while benefiting from FastAPI’s scalability, logging, and integration features—all without sending your data to external servers.


1. Why Run DeepSeek-R1 Locally?

Running DeepSeek-R1 on your own machine has multiple advantages:

  • Privacy & Security: No data is sent to third-party services
  • Performance & Low Latency: Local inference avoids remote API calls
  • Customization: Fine-tune or adjust inference parameters as needed
  • No Rate Limits: In-house solution means no usage caps or unexpected cost spikes
  • Offline Availability: Once downloaded, the model runs even without internet access

2. Setting Up DeepSeek-R1 Locally With Ollama

2.1 Installing Ollama

  1. Download Ollama from the official website.
  2. Install it on your machine, just like any application.
note

Check Ollama’s documentation for platform-specific support. It’s available on macOS and some Linux distributions.

2.2 Download and Test DeepSeek-R1

Ollama makes model retrieval simple:

ollama run deepseek-r1

This command automatically downloads DeepSeek-R1 (the default variant). If your hardware cannot handle the full 671B-parameter model, specify a smaller distilled version:

ollama run deepseek-r1:7b
info

DeepSeek-R1 offers different parameter sizes (e.g., 1.5B, 7B, 14B, 70B, 671B) for various hardware setups.

2.3 Running DeepSeek-R1 in the Background

To serve the model continuously (useful for external services like FastAPI):

ollama serve

By default, Ollama listens on http://localhost:11434.


3. Using DeepSeek-R1 Locally

3.1 Command-Line (CLI) Inference

You can chat directly with DeepSeek-R1 in your terminal:

ollama run deepseek-r1

Type a question or prompt; responses stream back in real time.

3.2 Accessing DeepSeek-R1 via API

If you’re building an application, you can call Ollama’s REST API:

curl http://localhost:11434/api/chat -d '{
"model": "deepseek-r1",
"messages": [{ "role": "user", "content": "Solve: 25 * 25" }],
"stream": false
}'
note

Set "stream": true to receive chunked streaming responses—a feature you can integrate easily into web apps or server frameworks like FastAPI.

3.3 Python Integration

Install the ollama Python package:

pip install ollama

Then use:

import ollama

response = ollama.chat(
model="deepseek-r1",
messages=[
{"role": "user", "content": "Explain Newton's second law of motion"},
],
)
print(response["message"]["content"])

4. FastAPI Integration and Streaming Responses

To wrap DeepSeek-R1 in a fully customizable FastAPI service, you can define streaming endpoints for advanced usage. Below is an example that sends chunked responses to the client:

import os
import json
from typing import List
from pydantic import BaseModel
from dotenv import load_dotenv
from fastapi import FastAPI, Query
from fastapi.responses import StreamingResponse
from openai import OpenAI

from .utils.prompt import ClientMessage, convert_to_openai_messages
from .utils.tools import get_current_weather # example tool
from .utils.tools import available_tools # hypothetical dict of tool funcs

load_dotenv(".env.local")

app = FastAPI()
client = OpenAI(api_key="ollama", base_url="http://localhost:11434/v1/")

class Request(BaseModel):
messages: List[ClientMessage]

def stream_text(messages: List[ClientMessage], protocol: str = 'data'):
stream = client.chat.completions.create(
messages=messages,
model="deepseek-r1",
stream=True,
)

if protocol == 'text':
for chunk in stream:
for choice in chunk.choices:
if choice.finish_reason == "stop":
break
else:
yield "{text}".format(text=choice.delta.content)

elif protocol == 'data':
draft_tool_calls = []
draft_tool_calls_index = -1

for chunk in stream:
for choice in chunk.choices:
if choice.finish_reason == "stop":
continue
elif choice.finish_reason == "tool_calls":
for tool_call in draft_tool_calls:
yield f'9:{{"toolCallId":"{tool_call["id"]}","toolName":"{tool_call["name"]}","args":{tool_call["arguments"]}}}\n'

for tool_call in draft_tool_calls:
tool_result = available_tools[tool_call["name"]](**json.loads(tool_call["arguments"]))
yield (
f'a:{{"toolCallId":"{tool_call["id"]}","toolName":"{tool_call["name"]}","args":{tool_call["arguments"]},'
f'"result":{json.dumps(tool_result)}}}\n'
)
elif choice.delta.tool_calls:
for tool_call in choice.delta.tool_calls:
id = tool_call.id
name = tool_call.function.name
arguments = tool_call.function.arguments
if id is not None:
draft_tool_calls_index += 1
draft_tool_calls.append({"id": id, "name": name, "arguments": ""})
else:
draft_tool_calls[draft_tool_calls_index]["arguments"] += arguments
else:
yield f'0:{json.dumps(choice.delta.content)}\n'

# usage
if chunk.choices == []:
usage = chunk.usage
prompt_tokens = usage.prompt_tokens
completion_tokens = usage.completion_tokens
yield (
f'd:{{"finishReason":"{"tool-calls" if len(draft_tool_calls) > 0 else "stop"}",'
f'"usage":{{"promptTokens":{prompt_tokens},"completionTokens":{completion_tokens}}}}}\n'
)

@app.post("/api/chat")
async def handle_chat_data(request: Request, protocol: str = Query('data')):
messages = request.messages
openai_messages = convert_to_openai_messages(messages)
response = StreamingResponse(stream_text(openai_messages, protocol))
response.headers['x-vercel-ai-data-stream'] = 'v1'
return response

Key Points:

  • stream=True allows the server to stream content chunk by chunk.
  • The code handles optional “tool calls” logic—customizable for your own environment.
  • FastAPI’s StreamingResponse ensures the client receives partial output in real time.

With this setup, you can embed DeepSeek-R1 into more complex microservices or orchestrate multi-step workflows that rely on streaming LLM responses.


6. Conclusion

DeepSeek-R1 combined with Ollama and FastAPI gives you a powerful local LLM service. You can handle all aspects of data ingestion, retrieval, and inference in one place—without relying on third-party endpoints or paying subscription costs. Here’s a recap:

  • Ollama manages downloading and serving the DeepSeek-R1 models.
  • FastAPI provides a flexible web layer for streaming responses or building microservices.

Build your local AI solutions confidently and privately—DeepSeek-R1 is now at your fingertips.

Powering Quant Finance with Qlib’s PyTorch MLP on Alpha360

· 5 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

Qlib is an AI-oriented, open-source platform from Microsoft that simplifies the entire quantitative finance process. By leveraging PyTorch, Qlib can seamlessly integrate modern neural networks—like Multi-Layer Perceptrons (MLPs)—to process large datasets, engineer alpha factors, and run flexible backtests. In this post, we focus on a PyTorch MLP pipeline for Alpha360 data in the US market, examining a single YAML configuration that unifies data ingestion, model training, and performance evaluation.

Harnessing AI for Quantitative Finance with Qlib and LightGBM

· 6 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

In the realm of quantitative finance, machine learning and deep learning are revolutionizing how researchers and traders discover alpha, manage portfolios, and adapt to market shifts. Qlib by Microsoft is a powerful open-source framework that merges AI techniques with end-to-end finance workflows.

This article demonstrates how Qlib automates an AI-driven quant workflow—from data ingestion and feature engineering to model training and backtesting—using a single YAML configuration for a LightGBM model. Specifically, we’ll explore the AI-centric aspects of how qrun orchestrates the entire pipeline and highlight best practices for leveraging advanced ML models in your quantitative strategies.

Correct Exchange Mapping in VeighNa to Resolve IB Security Definition Errors

· 14 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

In the intricate world of algorithmic trading, seamless integration between trading platforms and broker APIs is paramount.

One common issue when interfacing with Interactive Brokers (IB) API is encountering the error:

ERROR:root:Error - ReqId: 1, Code: 200, Message: No security definition has been found for the request

This error typically arises due to incorrect exchange mapping, preventing Interactive Brokers (IB) from recognizing the requested security. This article delves into the importance of accurate exchange mapping within the VeighNa trading platform, provides a detailed overview of IB's symbol rules, explains the updatePortfolio method, and offers guidance on implementing correct mappings to avoid such errors.

Understanding the Sniper Algorithm Implementation in Algorithmic Trading

· 8 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

In the realm of algorithmic trading, execution algorithms play a pivotal role in optimizing trade orders to minimize market impact and slippage. One such algorithm is the Sniper Algorithm, which is designed to execute trades discreetly and efficiently by capitalizing on favorable market conditions.

This article aims to review and understand the implementation of the Sniper Algorithm as provided in the VeighNa trading platform's open-source repository. By dissecting the code and explaining its components, we hope to provide clarity on how the algorithm functions and how it can be utilized in practical trading scenarios.

Backtesting NVIDIA Stock Strategies on VeighNa - Moving Average Crossover Strategy

· 15 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

Backtesting is essential for validating trading strategies, especially in the high-frequency and volatile world of stocks like NVIDIA (NVDA). Using VeighNa, an open-source algorithmic trading system, provides traders with the flexibility to thoroughly test strategies and optimize for performance. In this guide, we'll walk through setting up VeighNa, backtesting a simple Moving Average Crossover strategy on NVIDIA, explaining the strategy in detail, troubleshooting common installation issues, and optimizing your strategy.

Automating Financial Data Collection and Uploading to Hugging Face for Algorithmic Trading

· 6 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

In the fast-paced world of algorithmic trading, accessing reliable and timely financial data is essential for backtesting strategies, optimizing models, and making data-driven trading decisions. Automating data collection can streamline your workflow and ensure that you have access to the most recent market information. In this guide, we’ll walk through how to automate the collection of stock data using Python and yfinance, and how to upload this data to Hugging Face for convenient access and future use.

Although this article uses NVIDIA stock data as an example, the process is applicable to any publicly traded company or financial instrument. By integrating data collection and storage into one automated pipeline, traders and analysts can focus on what matters most—developing strategies and maximizing returns.

Algorithmic Trading with VeighNa and Interactive Brokers - Installation Guide and Troubleshooting

· 5 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

Algorithmic trading is transforming the financial landscape, and frameworks like VeighNa combined with Interactive Brokers (IB) offer traders the tools they need to optimize their trading strategies and automate execution across global markets. However, setting up these tools on macOS, particularly on Apple Silicon (M1/M2), can be tricky due to package compatibility issues. This guide will walk you through the installation process of VeighNa with IB on macOS, highlighting all the potential gotchas we encountered, along with their solutions.