Skip to main content

Tokio vs tokio-stream in WebSocket adapters - stream-first vs select!

· 6 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

TL;DR

  • Tokio is the runtime and low-level primitives (tasks, I/O, timers, channels, tokio::select!).
  • tokio-stream is an optional companion that:
    • wraps Tokio primitives into Streams (e.g., ReceiverStream, BroadcastStream, IntervalStream);
    • provides combinators (map, filter, merge, timeout, throttle, chunks_timeout, StreamMap) for declarative event pipelines.
  • If your adapter pulls from channels with recv().await and coordinates with select!, you usually don’t need tokio-stream.
  • If your adapter exposes or composes Streams (fan-in, time windows, per-item timeouts, etc.), you do.

What each crate gives you

Tokio (runtime + primitives)

  • #[tokio::main], tokio::spawn, tokio::select!
  • Channels: tokio::sync::{mpsc, broadcast, watch, oneshot}
  • Time: tokio::time::{sleep, interval, timeout}
  • Signals: tokio::signal
  • Typical style: “manual pump” with recv().await inside a select! loop.

tokio-stream (adapters + combinators)

  • Wrappers (Tokio → Stream):
    • wrappers::ReceiverStream<T>mpsc::Receiver<T>
    • wrappers::UnboundedReceiverStream<T>
    • wrappers::BroadcastStream<T>broadcast::Receiver<T>
    • wrappers::WatchStream<T>watch::Receiver<T>
    • wrappers::IntervalStreamtokio::time::Interval
  • Combinators via StreamExt: next, map, filter, merge (with SelectAll), StreamMap (keyed fan-in), and time-aware ops (timeout, throttle, chunks_timeout) when the crate’s time feature is enabled.

Two idioms for adapters (with complete snippets)

1) Channel + select! (“manual pump”) — no tokio-stream needed

use tokio::{select, signal, sync::mpsc};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (tx, mut rx) = mpsc::channel::<String>(1024);

// Example producer
tokio::spawn(async move {
let _ = tx.send("hello".to_string()).await;
});

let mut sigint = signal::ctrl_c();

loop {
select! {
maybe = rx.recv() => {
match maybe {
Some(msg) => { tracing::info!("msg: {msg}"); }
None => break, // channel closed
}
}
_ = &mut sigint => {
tracing::info!("shutting down");
break;
}
else => break,
}
}

Ok(())
}

Pros

  • Minimal dependencies, explicit control and shutdown.
  • Clear backpressure semantics via channel capacity.

Cons

  • Fan-in across many/dynamic sources is verbose.
  • Transformations (map/filter/batch) are hand-rolled.

use std::time::Duration;
use tokio::sync::mpsc;
use tokio_stream::{
wrappers::{ReceiverStream, IntervalStream},
StreamExt, // for .next() and combinators
};

enum AdapterEvent { User(String), Order(String), Heartbeat }

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (tx_user, rx_user) = mpsc::channel::<String>(1024);
let (tx_order, rx_order) = mpsc::channel::<String>(1024);

// Example producers
tokio::spawn(async move { let _ = tx_user.send("u1".into()).await; });
tokio::spawn(async move { let _ = tx_order.send("o1".into()).await; });

let ticker = tokio::time::interval(Duration::from_secs(1));

let users = ReceiverStream::new(rx_user).map(AdapterEvent::User);
let orders = ReceiverStream::new(rx_order).map(AdapterEvent::Order);
let beats = IntervalStream::new(ticker).map(|_| AdapterEvent::Heartbeat);

// Compose: merge multiple sources and shape the flow
let mut events =
users.merge(orders)
.merge(beats)
.throttle(Duration::from_millis(20));

while let Some(ev) = events.next().await {
match ev {
AdapterEvent::User(v) => tracing::info!("user: {v}"),
AdapterEvent::Order(v) => tracing::info!("order: {v}"),
AdapterEvent::Heartbeat => tracing::debug!("tick"),
}
}

Ok(())
}

Pros

  • Concise fan-in and transforms (filter/map/batch/timeout).
  • Natural fit when returning impl Stream<Item = Event> to consumers.

Cons

  • Adds one dependency; slightly different ownership/lifetimes vs bare Receiver.

Side-by-side: when to use which

AspectChannel + tokio::select! (no tokio-stream)Stream-first (uses tokio-stream)What the dependency implies
Why it’s usedPull from channels via recv().await, coordinate with select!.Wrap Tokio primitives as Streams and/or use combinators.Presence of tokio-stream signals a stream-centric composition.
Primary abstractionFutures + channels + select!.Stream<Item = T> + wrappers + StreamExt.Stream API → extra crate.
Typical codewhile let Some(x) = rx.recv().await {}, select! { ... }ReceiverStream::new(rx).map(...).merge(...).next().awaitWrappers/combinators imply tokio-stream.
Fan-in / mergingManual select! arms; verbose for many/dynamic sources.merge, SelectAll, or StreamMap for succinct fan-in.tokio-stream buys tools for multiplexing.
Timers / heartbeatsinterval() polled in loops.IntervalStream + timeout/throttle/chunks_timeout.Time-aware ops rely on tokio-stream + features.
Public API shapePull: async fn next_event() -> Option<T>.Stream: fn into_stream(self) -> impl Stream<Item = T>.Exposing a stream often requires the crate.
ComposabilityHand-rolled transforms.One-liners with StreamExt (map/filter/batch).Enables declarative pipelines.
BackpressureChannel capacity governs it; explicit.Same channels underneath; wrappers don’t change capacity.Neutral; it’s about ergonomics.
Fairness/orderingselect! randomizes fairness per iteration.Per-stream order preserved; cross-stream order depends on combinator.Document semantics either way.
TestabilityManual harnesses around loops..take(n), .collect::<Vec<_>>(), etc.Stream APIs are often easier to test.
Cost / depsLean; no extra crate.Adds tokio-stream; thin adapter overhead.Main cost is dependency surface.

Design recipes (complete, paste-ready)

A) Channel-first everywhere (leanest; drop tokio-stream)

  • Keep a pull API like next_event().
  • Use tokio::time::timeout for per-item deadlines.
use std::time::Duration;
use tokio::{sync::mpsc, time::timeout};

pub async fn pump_with_timeout(mut rx: mpsc::Receiver<String>) -> anyhow::Result<()> {
loop {
match timeout(Duration::from_secs(5), rx.recv()).await {
Ok(Some(msg)) => tracing::info!("msg: {msg}"),
Ok(None) => break, // channel closed
Err(_) => tracing::warn!("no event within 5s"),
}
}
Ok(())
}

B) Offer both (feature-gated Stream API)

Cargo.toml

[features]
default = []
stream-api = ["tokio-stream"]

[dependencies]
tokio = { version = "1", features = ["rt-multi-thread","macros","sync","time","signal"] }
tokio-stream = { version = "0.1", optional = true }

Client

#[cfg(feature = "stream-api")]
use tokio_stream::wrappers::ReceiverStream;

pub struct Client {
rx_inbound: tokio::sync::mpsc::Receiver<MyEvent>,
}

impl Client {
pub async fn next_event(&mut self) -> Option<MyEvent> {
self.rx_inbound.recv().await
}

#[cfg(feature = "stream-api")]
pub fn into_stream(self) -> ReceiverStream<MyEvent> {
ReceiverStream::new(self.rx_inbound)
}
}

C) Stream-first everywhere (plus pull convenience)

  • Internally fan-out via broadcast so multiple consumers can subscribe.
use tokio::sync::{mpsc, broadcast};
use tokio_stream::wrappers::BroadcastStream;

pub struct Client {
rx_inbound: mpsc::Receiver<Event>, // pull path
bus: broadcast::Sender<Event>, // stream path
_reader: tokio::task::JoinHandle<()>,
}

impl Client {
pub async fn next_event(&mut self) -> Option<Event> {
self.rx_inbound.recv().await
}

pub fn event_stream(&self) -> BroadcastStream<Event> {
BroadcastStream::new(self.bus.subscribe())
}
}

D) Expose a Stream without tokio-stream

  • Implement Stream directly over mpsc::Receiver via poll_recv.
use futures_core::Stream;
use pin_project_lite::pin_project;
use std::{pin::Pin, task::{Context, Poll}};
use tokio::sync::mpsc;

pin_project! {
pub struct EventStream<T> {
#[pin]
rx: mpsc::Receiver<T>,
}
}

impl<T> EventStream<T> {
pub fn new(rx: mpsc::Receiver<T>) -> Self { Self { rx } }
}

impl<T> Stream for EventStream<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().rx.poll_recv(cx)
}
}

Performance, backpressure, ordering

  • Overhead: ReceiverStream is a thin adapter; hot-path costs are typically parsing/allocations, not the wrapper.
  • Backpressure: unchanged—governed by channel boundedness and consumer speed.
  • Ordering: per-stream order is preserved; merged streams don’t guarantee global order—timestamp if strict ordering matters.
  • Fairness: tokio::select! randomizes branch polling; stream fan-in fairness depends on the specific combinator (merge, SelectAll, StreamMap).

A quick decision checklist

  • Need to return impl Stream<Item = Event> or use stream combinators? → Use tokio-stream.
  • Only need a single event loop with recv().await and select!? → Tokio alone is fine.
  • Want both ergonomics and lean defaults? → Feature-gate a stream view (stream-api).

References (URLs)

Hyperliquid Gasless Trading – Deep Comparison, Fees, and 20 Optimized Strategies

· 6 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

TL;DR Hyperliquid runs its own Layer-1 with two execution domains:

  • HyperCore — native on-chain central limit order book (CLOB), margin, funding, liquidations.
  • HyperEVM — standard EVM runtime (gas metered, paid in HYPE).

Trading on HyperCore is gasless: orders, cancels, TP/SL, TWAP, Scale ladders, etc. are signed actions included in consensus, not EVM transactions.

  • You don’t need HYPE to place/cancel orders.
  • You pay maker/taker fees and funding, not gas.
  • Spam is mitigated with address budgets, rate limits, open-order caps.
  • If you need more throughput: buy request weight at $0.0005 per action.

The design enables CEX-style strategies (dense ladders, queue dancing, rebates, hourly hedging) without the friction of gas.

Official GitHub repos:


1. How “gasless” works

Order lifecycle

Wallet signs payload  →  Exchange endpoint → Node → Validators (HyperBFT)
↘ deterministic inclusion into HyperCore state
  • Signatures, not transactions. Your wallet signs payloads (EIP-712 style). These are posted to the Exchange endpoint, gossiped to validators, ordered in consensus, and applied to HyperCore. → No gas, just signature.

  • Onboarding. Enable trading = sign once. Withdrawals = flat $1 fee, not a gas auction. Docs → Onboarding

  • Spam protection.

    • Address budgets: 10k starter buffer, then 1 action per 1 USDC lifetime fills.
    • Open-order cap: base 1,000 → scales to 5,000.
    • Congestion fairness: max 2× maker-share per block.
    • ReserveRequestWeight: buy capacity at $0.0005/action. Docs → Rate limits
  • Safety rails.

    • scheduleCancel (dead-man’s switch)
    • expiresAfter (time-box an action)
    • noop (nonce invalidation)
  • Order types. Market, Limit, ALO (post-only), IOC, GTC, TWAP, Scale, TP/SL (market or limit), OCO. Docs → Order types

  • Self-trade prevention. Expire-maker: cancels resting maker side instead of self-fill. Docs → STP


2. Fees: Hyperliquid vs DEXes & CEXes

Perps (base tiers)

VenueMakerTakerNotes
Hyperliquid0.015%0.045%Gasless actions; staking discounts up to 40%; rebates up to –0.003%
dYdX v40.01%0.05%Gasless submits/cancels; fills only
GMX v2 (perps)0.04–0.06%0.04–0.06%Round-trip 0.08–0.12% + funding/borrow + L2 gas
Binance Futures~0.018%~0.045%VIP/BNB discounts; USDC-M can hit 0% maker
Bybit Perps0.020%0.055%Tiered; VIP reductions
OKX Futures0.020%0.050%VIP can reach –0.005% / 0.015%
Kraken Futures0.020%0.050%Down to 0% / 0.01% at scale

Spot

VenueMakerTakerGas
Hyperliquid0.040%0.070%Gasless actions; $1 withdraw
Uniswap v30.01–1%0.01–1%User pays gas; or solver embeds in price
Bybit Spot0.15%0.10–0.20%CEX; no gas
OKX Spot0.08%0.10%VIP/OKB discounts

3. Funding models

  • Hyperliquid: 8h rate paid hourly (1/8 each hour). Hyperps use EMA mark (oracle-light).
  • dYdX v4: hourly funding; standard premium/interest.
  • GMX v2: continuous borrow vs pool imbalance.

4. What gasless enables (tactically)

  • Dense ladders + queue dancing: cheap to modify/cancel 1000s of levels.
  • Granular hedging: rebalance perps/spot hedges hourly without friction.
  • CEX-style STP + ALO: protect queue priority.
  • Deterministic inclusion: HyperBFT ensures one global order sequence.
  • Predictable scaling: buy request weight explicitly instead of gas auction.

5. Ten core strategies

  1. Passive Maker Ladder (ALO + STP) Build dense post-only ladders, earn spread + rebates, cancel/repost gas-free.

  2. Rebate Farming (maker-share) Hit ≥0.5%, 1.5%, 3% maker volume shares to unlock –0.001%/–0.002%/–0.003%.

  3. Funding-Arb / Cash-and-Carry Long spot vs short perp; rebalance hourly gas-free.

  4. TWAP Execution Use native 30s slice TWAP with slippage caps; gasless param tweaks.

  5. Scale Order Grids Deploy wide grids with up to 5k resting orders; adjust spacing by ATR.

  6. Latency-Aware MM Run node, use noop for stale nonces.

  7. OCO Risk-Boxing (TP/SL) Parent-linked stops/targets; frequent adjustment gasless.

  8. Hyperps Momentum/Fade Trade EMA-based hyperps; funding skew stabilizes. Turnkey repo

  9. Dead-Man’s Switch Hygiene Always use scheduleCancel; pair with expiresAfter.

  10. Throughput Budgeting Add logic to purchase reserveRequestWeight at spikes.


6. Ten advanced strategies

  1. Maker-Skewed Basis Harvest Hedge legs passively, collect rebates + funding.

  2. Adaptive Spread Ladder Contract/expand quotes with realized vol; keep order count fixed.

  3. Queue-Position Arbitrage Gasless modify to overtake by 1 tick; requires local queue estimation.

  4. Stale-Quote Punisher Flip passive→taker when off-chain anchors are stale.

  5. Rebate-Neutral Market Impact Hedger Pre-compute edge ≈ (S/2 − A − f_m); trade only when ≥0.

  6. Funding Skew Swing-Trader Switch between mean-revert & trend based on funding drift.

  7. Dead-Man Sessioner Each session starts with scheduleCancel(t) to avoid zombie orders.

  8. Liquidity Layer Splitter Spread ladders across accounts; use STP to avoid self-trades.

  9. Cross-Venue Micro-Arb HL vs CEX/DEX; taker on mispriced side, maker on the other.

  10. Event-Mode Capacity Burst Pre-buy request weight pre-CPI/FOMC; change ladder parameters.


7. Cost sanity check ($100k notional)

  • Hyperliquid: 0.015% maker ($15) + 0.045% taker ($45) = $60 (+ funding).
  • dYdX v4: 0.01% + 0.05% = $60.
  • GMX v2: 0.04–0.06% open + 0.04–0.06% close = $80–120 (+ borrow + gas).
  • Binance Futures: 0.018% + 0.045% ≈ $63 (base VIP).

8. Implementation gotchas

  • Budgets & caps: track in code; cancels have higher allowance; throttling needed.
  • Min sizes: perps $10 notional; spot 10 quote units.
  • ExpiresAfter: avoid triggering (5× budget cost).
  • Node ops: run Linux, open ports 4001/4002, colocate in Tokyo.
  • Nonces: prefer modify; use noop if stuck.

9. Comparison snapshot

  • Hyperliquid & dYdX v4 — gasless trading actions, on-chain CLOB, deterministic finality.
  • UniswapX / CoW — user-gasless via solver; solver pays gas, embeds in your price.
  • Uniswap v3/v4, GMX — user pays gas + pool fee; MEV & slippage dominate costs.
  • CEXes — no gas, lowest fees at VIP, fiat rails; but centralized custody.

10. GitHub Index


Bottom Line

Hyperliquid takes gas out of the trading loop, letting traders focus on fees, funding, latency, and inventory control. The result: a CEX-like experience with on-chain transparency.

Best use cases:

  • High-frequency maker strategies (queue-dancing, rebates).
  • Funding arbitrage with fine-grained rebalancing.
  • Event-driven hedging.
  • Developers who want to build bots in Python/Rust/TS/Go without juggling gas balances.

Slaying Bullish Bias - A Market Wizards Playbook

· 8 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

“The markets are never wrong; opinions often are.”
—Jesse Livermore (quoted by Bruce Kovner in Market Wizards)

2025 is a cognitive trap for equity bulls. The Ukraine front barely moves, President Trump’s blanket 10 % tariff rattles importers, and German GDP just printed –0.6 % QoQ—yet the S&P 500 hovers north of 5 500.
If that disconnect feels comfortable, your built-in bullish bias (the reflex that “prices should rise”) is probably steering the wheel.

Below you’ll find the fully annotated 30-question audit that the original Market Wizards might run if they sat at your terminal today. Each line now includes:

  • Wizard Insight – the lesson Schwager’s interviewees hammered home.
  • 2025 Angle – why the trap is live right now.
  • Real-World Example – an actual 2025 tape or trade vignette.

Paste the checklist into your trading journal, sprint through one block per week, and watch your P/L detach from hope-fuelled drift.


1 Self-Diagnosis & Mind-Set

#QuestionWizard Insight2025 AngleReal-World Example
1Do you scan for longs first?Mark Cook forced students to open a bearish filter before coffee.All major U.S. broker dashboards open on “Top Gainers.”11 Mar 2025: NVDA +6 % headlined your grid; bottom losers list showed LUMN –13 % (a better 2-R short you never saw).
25 % drop—curiosity or dip euphoria?Paul Tudor Jones cut leverage 50 % within minutes on 19 Oct 1987.15 Mar 2025: SPX –5.1 %, VIX 34 → index kept sliding another –2 % before basing.You felt “great entry” and bought QQQ, stopped out –1 R next day.
3Does shorting feel “un-American”?Tom Baldwin joked “The pits only cheer the upside.”Media framed every 2024 sell-off as “unpatriotic betting.”You posted a bearish tweet on Apple and got piled-on for “fighting innovation.”
4Dips = noise, rallies = trends?Ed Seykota logged only % risk and ATR multiples—no adjectives.CNBC still calls –2 % a “slump” but +2 % a “rally.”23 Apr 2025 journal: “just a blip lower” (SPX –1.8 %), “solid up-trend” (+1.6 %).
5Is self-worth tied to rising curves?Seykota kept family money in T-Bills.Real college costs +6 % YoY; equity drift no longer guarantees coverage.You increased size after your kid’s tuition invoice hit inbox.

2 Historical Perspective & Narrative Traps

#QuestionWizard Insight2025 AngleReal-World Example
6How did you fare in each mini-crash?Jones was green in ’87; Raschke flat in ’98.2022 bear (–27 %) still on broker statement.Your 2022 curve: –18 % vs CTA index +13 %.
7Tested your edge with drift = 0?Seykota’s systems worked on pork bellies—no drift.Forward SPX drift est. < 4 %.Your momentum back-test Sharpe fell from 1.2 ➜ 0.48.
8Rely on “Don’t bet against America”?Kovner warns empires rotate.Proposed 2 % buy-back tax in House bill HR-1735.Removing buy-backs in DCF knocked 7 turns off Apple PE.
9Ignoring survivorship in Wizard lore?Schwager himself says thousands blew up.TikTok “profit porn” hides losers.Your Telegram group shares only green screenshots.
10Studied markets that never bounced?Japanese believers held Nikkei bags for 34 yrs.Greek ASE –85 % from ’07 peak even now.Your Europe ETF overweight assumes 7 % CAGR.

3 Quantitative Evidence

#QuestionWizard Insight2025 AngleReal-World Example
11Shorts share of tickets & P/L?Cook: “Trade both sides or half your vision is gone.”Q1-25 had strongest 3-day down-impulse since Covid lows.9 shorts out of 112 trades; net P/L –2 R.
12Invert your long signal—result?Seykota’s “whipsaw song” works both ways.High-short-interest anomaly revived with expensive rates.Inverted signal on same universe scored Sharpe 0.32.
13Price vs log-return testing?Wizards think in % risk.Nasdaq 100 raw-point rise masks compounding.Strategy CAGR fell from 18 % ➜ 11 % in log mode.
14Stop symmetry?Raschke: 2 ATR both sides.Meme squeezes tempt 1 ATR shorts, 3 ATR longs.Last month: 6 short stop-outs at –1 ATR, 2 long at –3 ATR.
15Monte-Carlo μ = 0 survival?Jones funds vol desks to weather drift drought.Commodity volatility doubles path risk now.10 000 paths: median curve flatlines by month 22.

4 Risk & Capital Allocation

#QuestionWizard Insight2025 AngleReal-World Example
16Exposure cap symmetric?Seykota could flip net ±200 %.Short borrow fees sub-1 % for 80 % of S&P names.You allow +150 % long, –25 % short.
17Averaging down losers?Kovner: “Losers average losers.”AI chip names drop 18 % intraday regularly.Added twice to AMD at –3 % and –6 %; closed –2 R.
18Cover shorts first in vol spikes?Tudor held shorts through crash until vol bled.Post-VIX-34 drift negative for 12 sessions.Closed TSLA short on spike, kept long tech—lost 1.4 R.
19Put hedge value?Jones buys vol only when skew cheap.1-month ATM put cost 1.8 % in Mar 2025.Last year: spent 3.4 R in premium, saved 1.1 R in crashes.
20Squeezes breach worst-case loss?Baldwin sized by dollar vol.Feb 2025 GME +40 % gap.Short lost 2.3 R overnight.

5 Process & Decision Architecture

#QuestionWizard Insight2025 AngleReal-World Example
21UI bias toward gainers?Seykota coded neutral dash.Broker UIs show green first.Missed FSLY –12 % fail because list buried.
22Short checklist depth?Raschke rehearses shorts like longs.Easier borrows post-reg changes.Long checklist 12 items; short only 5.
23Narrative only for shorts?Wizards trust price.News calls every dip an “overreaction.”Skipped META short for lack of “fundamental story”; missed –8 %.
24Post-mortem balance?Cook logs every miss.Feb 2025: three perfect failed-break short signals unreviewed.Reviewed 7 missed longs, 0 shorts.
25Auto-flip after failed breakout?“Failed move = fast move” —Soros.AI names fake breakouts weekly.Long NVDA fake-out –1 R, no flip; price dropped another 4 %.

6 Psychology & Continuous Improvement

#QuestionWizard Insight2025 AngleReal-World Example
26Bias tags clustering on longs?Jones hired risk coach.AI tools auto-tag sentiment now.65 % optimism tags on long entries, 15 % on shorts.
27Real-time beta alerts?Tudor’s board lit red at β > 0.7.Slack webhooks trivial.Hit 0.78 beta on 9 Apr, noticed next day.
28Gap-down rehearsal?Basso ran crash sims monthly.Turkey ETF gap –12 % overnight, Feb 2025.Panicked exit + slippage –1 R; never rehearsed scenario.
29Forced-flat longs feeling?Seykota welcomes dry powder.Broker outage flushed longs 14 Jan.Felt panic → identity fusion with bull thesis.
30Preparing for lower drift?Wizards add new edges.Demographics & reshoring compress margins.Equity CAGR model still at 8 %.

7 Wrap-Up

Bullish bias survives because it pays most of the time—until it erases years of gains in a single macro grenade.
The Market Wizards neutralised the bias through symmetry: equal screens, stops, reviews, and above all, equal respect for up and down tape.

Run this playbook once per quarter:

  1. Audit each question honestly.
  2. Patch the weakest habit or policy.
  3. Re-test your edge in a zero-drift simulation.

Do that, and the next tariff volley, energy spike, or AI bubble unwind becomes just another tradeable regime—not a career-ending ambush.

Happy (bias-free) trading!

Contributing a Safer MarketIfTouchedOrder to Nautilus Trader — Hardening Conditional Orders in Rust

· 3 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

TL;DR – PR #2577 introduces a fallible constructor, complete domain-level checks, and four focussed tests for MarketIfTouchedOrder, thereby closing long-standing Issue #2529 on order-validation consistency.


1 Background

MarketIfTouchedOrder (MIT) is effectively the reverse of a stop-market order: it lies dormant until price touches a trigger, then fires as an immediate market order.
Because a latent trigger feeds straight into an instant fill path, robust validation is non-negotiable—any silent mismatch becomes a live trade.


2 Why the Change Was Necessary

ProblemImpact
Partial positivity checks on quantity, trigger_price, display_qtyInvalid values propagated deep into matching engines before exploding
TimeInForce::Gtd accepted expire_time = NoneProgrammer thought they had “good-til-date”; engine treated it as GTC
No check that display_qty ≤ quantityIceberg slice could exceed total size, leaking full inventory
Legacy new API only panickedCall-site couldn’t surface errors cleanly

Issue #2529 demanded uniform, fail-fast checks across all order types; MIT was first in line.


3 What PR #2577 Delivers

AreaBefore (v0)After (v1)
Constructornew → panic on errornew_checkedanyhow::Result<Self>; new now wraps it
Positivity checksPartialGuaranteed for quantity, trigger_price, (optional) display_qty
GTD ordersexpire_time optionalRequired when TIF == GTD
Iceberg ruleNonedisplay_qty ≤ quantity
Error channelOpaque panicsPrecise anyhow::Error variants
Tests04 rstest cases (happy-path + 3 failure modes)

Diff stats: +159 / −13 – one file crates/model/src/orders/market_if_touched.rs.


4 File Walkthrough Highlights

  1. new_checked – all domain guards live here; returns Result.
  2. Guard helpers – re-uses check_positive_quantity, check_positive_price, check_predicate_false.
  3. Legacy compatibilitynew() simply calls Self::new_checked(...).expect(FAILED).
  4. apply() tweak – slippage is recomputed immediately after a fill event.
  5. Testsok, quantity_zero, gtd_without_expire, display_qty_gt_quantity.

6 Order-Lifecycle Diagram


7 Using the New API

let mit = MarketIfTouchedOrder::new_checked(
trader_id,
strategy_id,
instrument_id,
client_order_id,
OrderSide::Sell,
qty,
trigger_price,
TriggerType::LastPrice,
TimeInForce::Gtc,
None, // expire_time
false, false, // reduce_only, quote_quantity
None, None, // display_qty, emulation_trigger
None, None, // trigger_instrument_id, contingency_type
None, None, // order_list_id, linked_order_ids
None, // parent_order_id
None, None, // exec_algorithm_id, params
None, // exec_spawn_id
None, // tags
init_id,
ts_init,
)?;

Prefer new_checked in production code; if you stick with new, you’ll still get clearer panic messages.


8 Impact & Next Steps

  • Fail-fast safety – all invariants enforced before the order leaves your code.
  • Granular error reporting – propagate Result outward instead of catching panics.
  • Zero breaking changes – legacy code continues to compile.

Action items: migrate to new_checked, bubble the Result, and sleep better during live trading.


9 References

TypeLink
Pull Request #2577https://github.com/nautechsystems/nautilus_trader/pull/2577
Issue #2529https://github.com/nautechsystems/nautilus_trader/issues/2529

Happy (and safer) trading!

Contributing a Safer LimitIfTouchedOrder to Nautilus Trader — A Small Open-Source Win for Rust Trading

· 3 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

LimitIfTouchedOrder (LIT) is a conditional order that sits between a simple limit order and a stop-limit order: it rests inactive until a trigger price is touched, then converts into a plain limit at the specified limit price. Because it straddles two distinct price levels and multiple conditional flags, robust validation is critical—any silent mismatch can manifest as unwanted executions in live trading.

Pull Request #2533 standardises and hardens the validation logic for LIT orders, bringing it up to the same quality bar as MarketOrder and LimitOrder. The PR was merged into develop on May 1 2025 by @cjdsellers (+207 / −9 across one file). (GitHub, [GitHub][2])


Why the Change Was Needed

  • Inconsistent invariantsquantity, price, and trigger_price were not always checked for positivity.
  • Edge-case foot-gunsTimeInForce::Gtd could be set with a zero expire_time, silently turning a “good-til-date” order into “good-til-cancel”.
  • Side/trigger mismatch – A BUY order with a trigger above the limit price (or SELL with trigger below limit) yielded undefined behaviour.
  • Developer frustration – Consumers of the SDK had to replicate guard clauses externally; a single canonical constructor removes that burden.

Key Enhancements

AreaBeforeAfter
Constructor APInew (panic-on-error)new_checked (returns Result) + new now wraps it
Positivity checksOnly partialGuaranteed for quantity, price, trigger_price, and optional display_qty
Display quantityNot validatedMust be ≤ quantity
GTD ordersNo expire validationMust supply expire_time when TimeInForce::Gtd
Side/trigger ruleUndefinedBUY ⇒ trigger ≤ price, SELL ⇒ trigger ≥ price
Unit-tests0 dedicated tests5 focused tests (happy-path + 4 failure modes)

Implementation Highlights

  1. new_checked – a fallible constructor returning anyhow::Result<Self>. All invariants live here.
  2. Guard helpers – leverages check_positive_quantity, check_positive_price, and check_predicate_false from nautilus_core::correctness.
  3. Legacy behaviour preserved – the original new now calls new_checked().expect("FAILED"), so downstream crates that relied on panics keep working.
  4. Concise Display impl – human-readable string that shows side, quantity, instrument, prices, trigger type, TIF, and status for quick debugging.
  5. Test suite – written with rstest; covers ok, quantity_zero, gtd_without_expire, buy_trigger_gt_price, and sell_trigger_lt_price.

Code diff stats: 207 additions, 9 deletions, affecting crates/model/src/orders/limit_if_touched.rs. ([GitHub][2])


Impact on Integrators

If you only called LimitIfTouchedOrder::new nothing breaks—you’ll merely enjoy better error messages if you misuse the API. For stricter compile-time safety, switch to the new new_checked constructor and handle Result<T> explicitly.

let order = LimitIfTouchedOrder::new_checked(
trader_id,
strategy_id,
instrument_id,
client_order_id,
OrderSide::Buy,
qty,
limit_price,
trigger_price,
TriggerType::LastPrice,
TimeInForce::Gtc,
None, // expire_time
false, false, // post_only, reduce_only
false, None, // quote_qty, display_qty
None, None, // emulation_trigger, trigger_instrument_id
None, None, // contingency_type, order_list_id
None, // linked_order_ids
None, // parent_order_id
None, None, // exec_algorithm_id, params
None, // exec_spawn_id
None, // tags
init_id,
ts_init,
)?;

Conclusion

PR [#2533] dramatically reduces the surface area for invalid LIT orders by centralising all domain rules in a single, auditable place. Whether you’re building discretionary tooling or a fully automated strategy on top of Nautilus Trader, you now get fail-fast behaviour with precise error semantics—no more mystery fills in production.

Next steps: adopt new_checked, make your own wrappers return Result, and enjoy safer trading.


How to Integrate OpenAI TTS with FFmpeg in a FastAPI Service

· 5 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

OpenAI offers powerful text-to-speech capabilities, enabling developers to generate spoken audio from raw text. Meanwhile, FFmpeg is the de facto standard tool for audio/video processing—used heavily for tasks like merging audio files, converting formats, and applying filters. Combining these two in a FastAPI application can produce a scalable, production-ready text-to-speech (TTS) workflow that merges and manipulates audio via FFmpeg under the hood.

This article demonstrates how to:

  1. Accept text input through a FastAPI endpoint
  2. Chunk text and use OpenAI to generate MP3 segments
  3. Merge generated segments with FFmpeg (through the pydub interface)
  4. Return or store a final MP3 file, ideal for streamlined TTS pipelines

By the end, you’ll understand how to build a simple but effective text-to-speech microservice that leverages the power of OpenAI and FFmpeg.


1. Why Combine OpenAI and FFmpeg

  • Chunked Processing: Long text might exceed certain API limits or timeouts. Splitting into smaller parts ensures each piece is handled reliably.
  • Post-processing: Merging segments, adding intros or outros, or applying custom filters (such as volume adjustments) becomes trivial with FFmpeg.
  • Scalability: A background task system (like FastAPI’s BackgroundTasks) can handle requests without blocking the main thread.
  • Automation: Minimizes manual involvement—one endpoint can receive text and produce a final merged MP3.

2. FastAPI Endpoint and Background Tasks

Below is the FastAPI code that implements a TTS service using the OpenAI API and pydub (which uses FFmpeg internally). It splits the input text into manageable chunks, generates MP3 files per chunk, then merges them:

import os
import time
import logging
from pathlib import Path

from dotenv import load_dotenv
from fastapi import APIRouter, HTTPException, Request, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from openai import OpenAI
from pydub import AudioSegment

load_dotenv(".env.local")

OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
client = OpenAI(api_key=OPENAI_API_KEY)

router = APIRouter()

logging.basicConfig(
level=logging.DEBUG, # Set root logger to debug level
format='%(levelname)s | %(name)s | %(message)s'
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

class AudioRequest(BaseModel):
input: str

def chunk_text(text: str, chunk_size: int = 4096):
"""
Generator that yields `text` in chunks of `chunk_size`.
"""
for i in range(0, len(text), chunk_size):
yield text[i:i + chunk_size]

@router.post("/speech")
async def generate_speech(request: Request, body: AudioRequest, background_tasks: BackgroundTasks):
"""
Fires off the TTS request in the background (fire-and-forget).
Logs are added to track progress. No zip file is created.
"""
model = "tts-1"
voice = "onyx"

if not body.input:
raise HTTPException(
status_code=400,
detail="Missing required field: input"
)

# Current time for folder naming or logging
timestamp = int(time.time() * 1000)

# Create a folder for storing output
output_folder = Path(".") / f"speech_{timestamp}"
output_folder.mkdir(exist_ok=True)

# Split the input into chunks
chunks = list(chunk_text(body.input, 4096))

# Schedule the actual speech generation in the background
background_tasks.add_task(
generate_audio_files,
chunks=chunks,
output_folder=output_folder,
model=model,
voice=voice,
timestamp=timestamp
)

# Log and return immediately
logger.info(f"Speech generation task started at {timestamp} with {len(chunks)} chunks.")
return JSONResponse({"detail": f"Speech generation started. Timestamp: {timestamp}"})

def generate_audio_files(chunks, output_folder, model, voice, timestamp):
"""
Generates audio files for each chunk. Runs in the background.
After all chunks are created, merges them into a single MP3 file.
"""
try:
# Generate individual chunk MP3s
for index, chunk in enumerate(chunks):
speech_filename = f"speech-chunk-{index + 1}.mp3"
speech_file_path = output_folder / speech_filename

logger.info(f"Generating audio for chunk {index + 1}/{len(chunks)}...")

response = client.audio.speech.create(
model=model,
voice=voice,
input=chunk,
response_format="mp3",
)

response.stream_to_file(speech_file_path)
logger.info(f"Chunk {index + 1} audio saved to {speech_file_path}")

# Merge all generated MP3 files into a single file
logger.info("Merging all audio chunks into one file...")
merged_audio = AudioSegment.empty()

def file_index(file_path: Path):
# Expects file names like 'speech-chunk-1.mp3'
return int(file_path.stem.split('-')[-1])

sorted_audio_files = sorted(output_folder.glob("speech-chunk-*.mp3"), key=file_index)
for audio_file in sorted_audio_files:
chunk_audio = AudioSegment.from_file(audio_file, format="mp3")
merged_audio += chunk_audio

merged_output_file = output_folder / f"speech-merged-{timestamp}.mp3"
merged_audio.export(merged_output_file, format="mp3")
logger.info(f"Merged audio saved to {merged_output_file}")

logger.info(f"All speech chunks generated and merged for timestamp {timestamp}.")
except Exception as e:
logger.error(f"OpenAI error (timestamp {timestamp}): {e}")

Key Takeaways

  • AudioRequest model enforces the presence of an input field.
  • chunk_text ensures no chunk exceeds 4096 characters (you can adjust this size).
  • BackgroundTasks offloads the TTS generation so the API can respond promptly.
  • pydub merges MP3 files (which in turn calls FFmpeg).

3. Using FFmpeg Under the Hood

Installing pydub requires FFmpeg on your system. Ensure FFmpeg is in your PATH—otherwise you’ll get errors when merging or saving MP3 files. For Linux (Ubuntu/Debian):

sudo apt-get update
sudo apt-get install ffmpeg

For macOS (using Homebrew):

brew install ffmpeg

If you’re on Windows, install FFmpeg from FFmpeg’s official site or use a package manager like chocolatey or scoop.


4. Mermaid JS Diagram

Below is a Mermaid sequence diagram illustrating the workflow:

Explanation:

  1. User sends a POST request with text data.
  2. FastAPI quickly acknowledges the request, then spawns a background task.
  3. Chunks of text are processed via OpenAI TTS, saving individual MP3 files.
  4. pydub merges them (calling FFmpeg behind the scenes).
  5. Final merged file is ready in your output directory.

5. Conclusion

Integrating OpenAI text-to-speech with FFmpeg via pydub in a FastAPI application provides a robust, scalable way to automate TTS pipelines:

  • Reliability: Chunk-based processing handles large inputs without overloading the API.
  • Versatility: FFmpeg’s audio manipulation potential is nearly limitless.
  • Speed: Background tasks ensure the main API remains responsive.

With the sample code above, you can adapt chunk sizes, add authentication, or expand the pipeline to include more sophisticated post-processing (like watermarking, crossfading, or mixing in music). Enjoy building richer audio capabilities into your apps—OpenAI and FFmpeg make a powerful duo.

How to Set Up and Run DeepSeek-R1 Locally With Ollama and FastAPI

· 5 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

DeepSeek-R1 is a family of large language models (LLMs) known for advanced natural language capabilities. While hosting an LLM in the cloud can be convenient, local deployment provides greater control over latency, privacy, and resource utilization. Tools like Ollama simplify this process by handling model downloading and quantization. However, to truly scale or integrate these capabilities into other services, you often need a robust REST API layer—FastAPI is perfect for this.

This article covers the entire pipeline:

  1. Installing and configuring Ollama to serve DeepSeek-R1 locally
  2. Interacting with DeepSeek-R1 using the CLI, Python scripts, or a FastAPI endpoint for streaming responses
  3. Demonstrating a minimal FastAPI integration, so you can easily wrap your model in a web service

By the end, you’ll see how to run DeepSeek-R1 locally while benefiting from FastAPI’s scalability, logging, and integration features—all without sending your data to external servers.


1. Why Run DeepSeek-R1 Locally?

Running DeepSeek-R1 on your own machine has multiple advantages:

  • Privacy & Security: No data is sent to third-party services
  • Performance & Low Latency: Local inference avoids remote API calls
  • Customization: Fine-tune or adjust inference parameters as needed
  • No Rate Limits: In-house solution means no usage caps or unexpected cost spikes
  • Offline Availability: Once downloaded, the model runs even without internet access

2. Setting Up DeepSeek-R1 Locally With Ollama

2.1 Installing Ollama

  1. Download Ollama from the official website.
  2. Install it on your machine, just like any application.
note

Check Ollama’s documentation for platform-specific support. It’s available on macOS and some Linux distributions.

2.2 Download and Test DeepSeek-R1

Ollama makes model retrieval simple:

ollama run deepseek-r1

This command automatically downloads DeepSeek-R1 (the default variant). If your hardware cannot handle the full 671B-parameter model, specify a smaller distilled version:

ollama run deepseek-r1:7b
info

DeepSeek-R1 offers different parameter sizes (e.g., 1.5B, 7B, 14B, 70B, 671B) for various hardware setups.

2.3 Running DeepSeek-R1 in the Background

To serve the model continuously (useful for external services like FastAPI):

ollama serve

By default, Ollama listens on http://localhost:11434.


3. Using DeepSeek-R1 Locally

3.1 Command-Line (CLI) Inference

You can chat directly with DeepSeek-R1 in your terminal:

ollama run deepseek-r1

Type a question or prompt; responses stream back in real time.

3.2 Accessing DeepSeek-R1 via API

If you’re building an application, you can call Ollama’s REST API:

curl http://localhost:11434/api/chat -d '{
"model": "deepseek-r1",
"messages": [{ "role": "user", "content": "Solve: 25 * 25" }],
"stream": false
}'
note

Set "stream": true to receive chunked streaming responses—a feature you can integrate easily into web apps or server frameworks like FastAPI.

3.3 Python Integration

Install the ollama Python package:

pip install ollama

Then use:

import ollama

response = ollama.chat(
model="deepseek-r1",
messages=[
{"role": "user", "content": "Explain Newton's second law of motion"},
],
)
print(response["message"]["content"])

4. FastAPI Integration and Streaming Responses

To wrap DeepSeek-R1 in a fully customizable FastAPI service, you can define streaming endpoints for advanced usage. Below is an example that sends chunked responses to the client:

import os
import json
from typing import List
from pydantic import BaseModel
from dotenv import load_dotenv
from fastapi import FastAPI, Query
from fastapi.responses import StreamingResponse
from openai import OpenAI

from .utils.prompt import ClientMessage, convert_to_openai_messages
from .utils.tools import get_current_weather # example tool
from .utils.tools import available_tools # hypothetical dict of tool funcs

load_dotenv(".env.local")

app = FastAPI()
client = OpenAI(api_key="ollama", base_url="http://localhost:11434/v1/")

class Request(BaseModel):
messages: List[ClientMessage]

def stream_text(messages: List[ClientMessage], protocol: str = 'data'):
stream = client.chat.completions.create(
messages=messages,
model="deepseek-r1",
stream=True,
)

if protocol == 'text':
for chunk in stream:
for choice in chunk.choices:
if choice.finish_reason == "stop":
break
else:
yield "{text}".format(text=choice.delta.content)

elif protocol == 'data':
draft_tool_calls = []
draft_tool_calls_index = -1

for chunk in stream:
for choice in chunk.choices:
if choice.finish_reason == "stop":
continue
elif choice.finish_reason == "tool_calls":
for tool_call in draft_tool_calls:
yield f'9:{{"toolCallId":"{tool_call["id"]}","toolName":"{tool_call["name"]}","args":{tool_call["arguments"]}}}\n'

for tool_call in draft_tool_calls:
tool_result = available_tools[tool_call["name"]](**json.loads(tool_call["arguments"]))
yield (
f'a:{{"toolCallId":"{tool_call["id"]}","toolName":"{tool_call["name"]}","args":{tool_call["arguments"]},'
f'"result":{json.dumps(tool_result)}}}\n'
)
elif choice.delta.tool_calls:
for tool_call in choice.delta.tool_calls:
id = tool_call.id
name = tool_call.function.name
arguments = tool_call.function.arguments
if id is not None:
draft_tool_calls_index += 1
draft_tool_calls.append({"id": id, "name": name, "arguments": ""})
else:
draft_tool_calls[draft_tool_calls_index]["arguments"] += arguments
else:
yield f'0:{json.dumps(choice.delta.content)}\n'

# usage
if chunk.choices == []:
usage = chunk.usage
prompt_tokens = usage.prompt_tokens
completion_tokens = usage.completion_tokens
yield (
f'd:{{"finishReason":"{"tool-calls" if len(draft_tool_calls) > 0 else "stop"}",'
f'"usage":{{"promptTokens":{prompt_tokens},"completionTokens":{completion_tokens}}}}}\n'
)

@app.post("/api/chat")
async def handle_chat_data(request: Request, protocol: str = Query('data')):
messages = request.messages
openai_messages = convert_to_openai_messages(messages)
response = StreamingResponse(stream_text(openai_messages, protocol))
response.headers['x-vercel-ai-data-stream'] = 'v1'
return response

Key Points:

  • stream=True allows the server to stream content chunk by chunk.
  • The code handles optional “tool calls” logic—customizable for your own environment.
  • FastAPI’s StreamingResponse ensures the client receives partial output in real time.

With this setup, you can embed DeepSeek-R1 into more complex microservices or orchestrate multi-step workflows that rely on streaming LLM responses.


6. Conclusion

DeepSeek-R1 combined with Ollama and FastAPI gives you a powerful local LLM service. You can handle all aspects of data ingestion, retrieval, and inference in one place—without relying on third-party endpoints or paying subscription costs. Here’s a recap:

  • Ollama manages downloading and serving the DeepSeek-R1 models.
  • FastAPI provides a flexible web layer for streaming responses or building microservices.

Build your local AI solutions confidently and privately—DeepSeek-R1 is now at your fingertips.

Adapting Stock Forecasts with AI

· 7 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

Financial markets are dynamic: price trends, volatility, and patterns constantly change. These shifts in data distribution, commonly called concept drift, pose a serious challenge for AI models trained on historical data. When the market regime changes—such as transitioning from a calm to a volatile environment—a “stale” model can drastically lose predictive power.

DDG-DA (Data Distribution Generation for Predictable Concept Drift Adaptation) addresses this by forecasting how the data distribution might evolve in the future, instead of only reacting to the most recent data. The approach is rooted in meta-learning (via Qlib’s Meta Controller framework) and helps trading or investment models stay ahead of new trends.

By the end of this article, you will understand:

  1. Why concept drift complicates forecasting in stocks and other financial time series
  2. How DDG-DA uses a future distribution predictor to resample training data
  3. How to incorporate this into Qlib-based workflows to improve stock return and risk-adjusted performance

Concept Drift in Stock Markets

Concept drift refers to changes in the underlying distribution of stock market data. These changes can manifest in multiple ways:

  • Trends: Bull or bear markets can shift faster or slower than expected
  • Volatility: Sudden spikes can invalidate models calibrated during calmer periods
  • Patterns: Market microstructure changes or new correlations can emerge, causing old patterns to wane

Traditional methods often react after drift appears (by retraining on recent data). However, if the drift is somewhat predictable, we can model its trajectory—and proactively train models on future conditions before they fully materialize.

Diagram: Concept Drift Overview

Here, a continuous market data stream (A) encounters distribution shifts (B). These can appear as new trends (C), volatility regimes (D), or changed patterns (E). As a result, a previously trained model (F) gradually loses accuracy (G) if not adapted.


DDG-DA: High-Level Approach

The core principle behind DDG-DA is to forecast the distribution shift itself. Specifically:

  1. Predict Future Distributions

    • A meta-model observes historical tasks (for example, monthly or daily tasks in which you train a new stock-prediction model).
    • This meta-model estimates how the data distribution might move in the next period, such as anticipating an uptick in volatility or a shift in factor exposures.
  2. Generate Synthetic Training Samples

    • Using the distribution forecast, DDG-DA resamples historical data to emulate the expected future conditions.
    • It might assign higher weights to periods with similar volatility or market conditions so the final training set reflects what the market might soon become.
  3. Train or Retrain the Forecasting Model

    • Your usual forecasting model (for example, LightGBM or LSTM) is then retrained on these forward-looking samples, aligning better with the next period’s actual data distribution.
    • As a result, the model remains more accurate when concept drift occurs.

Diagram: DDG-DA Core Steps

This process repeats periodically (for example, each month) to keep your forecasting models aligned with upcoming market conditions.


How It Integrates with Qlib

Qlib provides an AI-oriented Quantitative Investment Platform that handles:

  • Data: Collecting and structuring historical pricing data, factors, and fundamentals
  • Modeling: Building daily or intraday forecasts using built-in ML or custom models
  • Meta Controller: A specialized component for tasks like DDG-DA, which revolve around higher-level meta-learning and distribution adaptation

Diagram: Qlib plus DDG-DA Integration

  1. Qlib Data Layer (A): Feeds into a standard ML pipeline for daily or intraday forecasting (B).
  2. DDG-DA sits in the Meta Controller (C), analyzing tasks, predicting distribution changes, and adjusting the pipeline.
  3. Results circle back into Qlib for backtesting and analysis (D).

Example: Monthly Stock Trend Forecasting

  1. Setting the Tasks

    • Suppose you update your stock-ranking model every month, using the last 2 years of data.
    • Each month is a “task” in Qlib. Over multiple months, you get a series of tasks for training and validation.
  2. Train the Meta-Model

    • DDG-DA learns a function that maps old data distribution patterns to new sample weights.
    • This ensures the next month’s training data distribution is closer to the actual conditions that month.
  3. Evaluate

    • Compare the results to standard approaches:
      • Rolling Retrain: Only uses the most recent data, ignoring the predictable drift pattern
      • Gradual Forgetting: Weighted by how recent data is, but no direct distribution forecast
      • DDG-DA: Weighs data by predicted future distribution, leading to stronger alignment when drift is not purely random

Diagram: Monthly Task Workflow


Performance and Findings

Research in the associated DDG-DA paper and Qlib examples shows:

  • Better Signal Quality: Higher Information Coefficient (IC) for stock selection
  • Enhanced Portfolio Returns: Larger annual returns, improved Sharpe Ratio, and lower drawdowns in backtests
  • Versatility: Works with a wide range of ML models (Linear, LightGBM, neural networks)
  • Limitations: If concept drift is completely random or abrupt (no pattern), DDG-DA’s advantages diminish. Predictability is key

Diagram: Performance Improvement


Practical Steps

  1. Install Qlib and ensure you have the dataset (for example, Alpha158) set up
  2. Clone the DDG-DA Example from the Qlib GitHub:
    git clone https://github.com/microsoft/qlib.git
    cd qlib/examples/benchmarks_dynamic/DDG-DA
  3. Install Requirements:
    pip install -r requirements.txt
  4. Run the Workflow:
    python workflow.py run
    • By default, it uses a simple linear forecasting model
    • To use LightGBM or another model, specify the --conf_path argument, for example:
      python workflow.py --conf_path=../workflow_config_lightgbm_Alpha158.yaml run
  5. Analyze Results:
    • Qlib’s recorder logs signal metrics (IC, ICIR) and backtest performance (annual return, Sharpe)
    • Compare with baseline methods (Rolling Retrain, Exponential Forgetting, etc.)

Diagram: Running DDG-DA Workflow


Conclusion

DDG-DA shows how AI can proactively tackle concept drift in stock forecasting. Instead of merely reacting to new data, it anticipates potential distribution changes, producing a more robust, forward-looking training set. When integrated into Qlib’s Meta Controller, it seamlessly fits your existing pipelines, from data ingestion to backtesting.

For practical use:

  • Ensure your market conditions exhibit some predictability. Random, sudden changes are harder to model
  • Combine with conventional best practices (risk management, hyperparameter tuning) for a holistic pipeline
  • Monitor performance: If drift patterns shift, you may need to retrain or retune the DDG-DA meta-model

By forecasting future market states and adapting ahead of time, DDG-DA helps your quantitative strategies remain agile and profitable in evolving financial environments.


Further Reading and References

Happy (adaptive) trading!

Leveraging Qlib and MLflow for Unified Experiment Tracking

· 5 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

Financial markets present a dynamic environment where active research and experimentation are critical. Qlib offers a complete “AI-oriented” solution for quantitative investment—covering data loaders, feature engineering, model training, and evaluation. Meanwhile, MLflow provides robust functionality for experiment tracking, handling metrics, artifacts, and hyperparameters across multiple runs. You can further enhance your documentation using specialized syntax for highlighting important information, such as notes or warnings, to help readers navigate complex workflows.

This article shows how to integrate Qlib and MLflow to manage your entire workflow—from data ingestion and factor engineering to model storage and versioning—under a single, unified experiment system. It also demonstrates various ways to emphasize notes or warnings to help readers explore the complexities of this setup.

By the end of this article, you will learn:

  1. How Qlib manages data and modeling in a typical quant workflow
  2. How MLflow tracks experiment artifacts, logs metrics, and organizes multiple runs
  3. How to integrate Qlib’s “Recorder” concept with MLflow’s tracking

1. Qlib Overview

Qlib is a powerful open-source toolkit designed for AI-based quantitative investment. It streamlines common challenges in this domain:

  • Data Layer: Standardizes daily or intraday bars, fundamental factors, and alpha signals
  • Feature Engineering: Offers an expression engine (alpha modeling) plus factor definitions
  • Modeling: Easily pluggable ML models (LightGBM, Linear, RNN, etc.) with out-of-the-box training logic
  • Evaluation and Backtest: Includes modules for analyzing signals, computing IC/RankIC, and running trading strategies in a backtest simulator

Diagram: Qlib Architecture

Below is a high-level view of Qlib’s architecture—how data flows from raw sources into Qlib’s data handlers, transforms into features, and ultimately fuels model training.

note

Some Qlib features—like intraday data handling or advanced factor expressions—may require additional configuration. Double-check your data paths and environment setup to ensure all pieces are properly configured.


2. MLflow Overview

MLflow is an experiment-tracking tool that organizes runs and artifacts:

  • Tracking: Logs params, metrics, tags, and artifacts (model checkpoints, charts)
  • UI: A local or remote interface (mlflow ui) for comparing runs side by side
  • Model Registry: Version controls deployed models, enabling easy rollback or re-deployment

Diagram: MLflow Overview

warning

When configuring MLflow on remote servers, remember to secure the tracking server appropriately. Unsecured endpoints may expose logs and artifacts to unintended parties.


3. Combining Qlib and MLflow

In typical usage, Qlib handles data ingestion, feature transformations, and model training. MLflow complements it by capturing:

  1. Run Metadata: Each Qlib “Recorder” maps to an MLflow run
  2. Metrics & Params: Qlib logs metrics like Sharpe Ratio or Information Coefficient (IC); MLflow’s UI centralizes them
  3. Artifacts: Saved model files, prediction results, or charts are stored in MLflow’s artifact repository

Diagram: Qlib + MLflow Integration

Below is a top-down diagram showing how user code interacts with Qlib, which in turn leverages MLflow for run logging.


4. Minimal Example

Here’s a simplified script showing the synergy among the three components:

import qlib
from qlib.workflow import R
from qlib.utils import init_instance_by_config

# 1) Init Qlib and MLflow
qlib.init(
exp_manager={
"class": "MLflowExpManager",
"module_path": "qlib.workflow.expm",
"kwargs": {
"uri": "file:/path/to/mlruns",
"default_exp_name": "QlibExperiment"
},
}
)

# 2) Start experiment and train
with R.start(experiment_name="QlibExperiment", recorder_name="run1"):
# Basic config
model_config = {"class": "LightGBMModel", "kwargs": {"learning_rate": 0.05}}
dataset_config = {...}

model = init_instance_by_config(model_config)
dataset = init_instance_by_config(dataset_config)
model.fit(dataset)

# Evaluate
predictions = model.predict(dataset)

# log some metrics
R.log_metrics(Sharpe=1.2, IC=0.03)

# Save artifacts
R.save_objects(**{"pred.pkl": predictions, "trained_model.pkl": model})
info

The snippet above logs metrics like Sharpe or IC, making them easily comparable across multiple runs. You can further log hyperparameters via R.log_params(...) for more granular comparisons.

Results:

  • A new MLflow run named “run1” under “QlibExperiment”
  • MLflow logs parameters/metrics (learning_rate, Sharpe, IC)
  • Artifacts “pred.pkl” and “trained_model.pkl” appear in MLflow’s artifact UI

5. Best Practices

  1. Organize Qlib tasks: Use Qlib’s SignalRecord or PortAnaRecord classes to store signals/backtest results, ensuring logs are automatically tied to MLflow runs
  2. Parameter Logging: Send hyperparameters or relevant config to R.log_params(...) for easy comparison in MLflow
  3. Artifact Naming: Keep artifact names consistent (e.g., "pred.pkl") across multiple runs
  4. Model Registry: Consider pushing your best runs to MLflow’s Model Registry for versioned deployment
danger

A mismatch between your local Qlib environment and remote MLflow server can cause logging errors. Ensure both environments are in sync (same Python versions, same library versions).


6. Conclusion

By connecting Qlib’s experiment pipeline to MLflow’s tracking features—and documenting everything thoroughly—you get the best of all worlds:

  • Qlib: AI-centric quant platform automating data handling, factor engineering, and modeling
  • MLflow: A robust interface for comparing runs, storing artifacts, and version-controlling the entire process

This synergy simplifies large-scale experimentation—especially when you frequently iterate over factor definitions, hyperparameters, or new trading strategies.


Further Reading and References

Experiment happy!

Qlib’s Nested Execution for High-Frequency Trading with AI

· 6 min read
Vadim Nicolai
Senior Software Engineer at Vitrifi

Introduction

High-Frequency Trading (HFT) involves handling large volumes of orders at extremely high speeds—often measured in microseconds or milliseconds. AI (machine learning and reinforcement learning, in particular) has become pivotal in capturing fleeting market opportunities and managing real-time decisions in these ultra-fast trading environments.

In Qlib, the Nested Decision Execution Framework simplifies building multi-level HFT strategies, allowing a high-level (daily or weekly) strategy to nest an intraday (or sub-intraday) executor or sub-workflow. This design enables realistic joint backtesting: daily portfolio selection and intraday HFT execution interact seamlessly, ensuring that real slippage, partial fills, and transaction costs are accurately accounted for.

By the end of this guide, you’ll understand:

  1. How Qlib structures multi-level workflows (daily vs. intraday).
  2. How AI techniques (supervised and reinforcement learning) slot into Qlib’s design.
  3. How to set up an Executor sub-workflow for high-frequency order splitting and real-time decision-making.

Multi-Level Strategy Workflow

Below is an overview diagram (adapted from Qlib’s documentation) depicting how daily strategies can nest intraday sub-strategies or RL agents:

  • Daily Strategy: Generates coarse decisions (e.g., “Buy X shares by day’s end”).
  • Executor: Breaks decisions into smaller actions. Within it, a Reinforcement Learning policy (or any other AI model) can run at minute or sub-minute intervals.
  • Simulator/Environment: Provides intraday data, simulates order fills/slippage, and feeds rewards back to the RL policy.

This nesting allows realistic interaction between daily allocation goals and intraday fill performance.


Key Components

1. Information Extractor (Intraday)

For HFT, Qlib can store data at 1-minute intervals, or even tick/orderbook-level data, using specialized backends (e.g., Arctic). An example below shows how Qlib can manage non-fixed-frequency records:

# Example snippet from qlib/examples/orderbook_data
# Download sample data, then import into your local mongo or Arctic DB
python create_dataset.py initialize_library
python create_dataset.py import_data

Once imported, intraday/tick data can be accessed by Qlib’s normal data APIs for feature engineering or direct RL state representation.


2. Forecast Model (Intraday + Daily)

A single Qlib workflow can hold multiple forecast models:

  • Daily Model: Predicts overnight returns or daily alpha (e.g., LightGBM on daily bars).
  • Intraday Model: Predicts short-term (minutes/seconds) price movements. This might be a small neural net or an RL policy evaluating states like order-book depth, spread, volume patterns, etc.

Qlib’s reinforcement learning interface (QlibRL) can also handle advanced models:

  • Policy: Learns from reward signals (e.g., PnL, transaction costs, slippage).
  • Action Interpreter: Converts policy actions into actual orders.

3. Decision Generator (Daily vs. Intraday)

Daily Decision Generator might produce a target portfolio:

Stock A: +5% allocation
Stock B: -2% allocation

Intraday Decision Generator (within the Executor) can then split these top-level instructions into multiple smaller trades. For example, an RL policy might decide to buy 2% of Stock A during the opening auction, 1% during midday, and 2% near closing, based on real-time microprice signals.


4. Executor & Sub-workflow (Nested)

Executor is where the nested approach truly shines. It wraps a more granular intraday or high-frequency sub-strategy.

This sub-workflow can be as simple as scheduling trades evenly or as advanced as an RL policy that:

  1. Observes short-term price movement.
  2. Acts to minimize slippage and transaction cost.
  3. Receives reward signals from the environment (filled shares, average fill price vs. VWAP, etc.).

5. Environment & Simulator

When applying Reinforcement Learning, Qlib uses an Environment wrapper:

  1. State: Intraday features (latest LOB data, partial fill stats).
  2. Action: The RL agent chooses to place a limit order, market order, or skip.
  3. Reward: Often the negative cost of trading or realized PnL improvement.

You can leverage Qlib’s built-in simulators or customize them for specific market microstructures.


Example Workflow Snippets

Here’s a high-level script illustrating a daily + intraday nested setup. (Pseudocode for demonstration only.)

# daily_intraday_workflow.py

import qlib
from qlib.config import C
from qlib.data import D
from qlib.rl.order_execution_policy import RLOrderExecPolicy
from qlib.strategy.base import BaseStrategy

class DailyAlphaStrategy(BaseStrategy):
"""Generates daily-level decisions (which stocks to buy/sell)."""

def generate_trade_decision(self, *args, **kwargs):
# Imagine we have daily predictions from a model...
scores = self.signal.get_signal() # daily alpha scores
# Then produce a dictionary {stock: weight or shares}
decisions = compute_target_positions(scores)
return decisions

class NestedExecutor:
"""Executor that calls an intraday RL sub-strategy for each daily decision."""

def __init__(self, intraday_policy):
self.intraday_policy = intraday_policy

def execute_daily_decision(self, daily_decision):
# Suppose daily_decision = { 'AAPL': +100 shares, 'MSFT': +50 shares }
# We'll break it into sub-orders via RL
for stock, shares in daily_decision.items():
# RL agent decides how to place those shares intraday
self.intraday_policy.run_execution(stock, shares)

def main():
qlib.init(provider_uri="your_data_path") # local data or remote server

daily_strategy = DailyAlphaStrategy(signal=your_daily_signal)
intraday_policy = RLOrderExecPolicy() # RL policy with QlibRL

executor = NestedExecutor(intraday_policy=intraday_policy)

# Hypothetical daily loop
for date in trading_calendar:
daily_decision = daily_strategy.generate_trade_decision()
executor.execute_daily_decision(daily_decision)

if __name__ == "__main__":
main()

Notes:

  • DailyAlphaStrategy uses a daily alpha model for stock scoring.
  • NestedExecutor calls RLOrderExecPolicy, which runs intraday steps.
  • Real code will handle position objects, trade calendars, and backtest frameworks in more detail.

Practical Tips for HFT + AI

  1. Data Freshness: HFT signals must be updated almost in real-time. Ensure your Qlib data pipeline is either streaming or as close to real-time as possible.
  2. Latency Considerations: Real HFT in production must address network latency and order routing. Qlib’s framework focuses on backtesting or simulation; integrating actual exchange connectivity is non-trivial.
  3. Overfitting & Market Regimes: Intraday data is often noisy; guard against overfitting your ML or RL models to fleeting patterns.
  4. Joint Optimization: Tweaking daily portfolio turnover and intraday execution in isolation can be suboptimal. Qlib’s nested design helps you see the whole chain’s PnL effect.
  5. Reinforcement Learning: Start simple (e.g., Q-learning or policy gradient) before moving to complex neural networks. Use carefully designed rewards capturing cost, fill rates, and profit.

Summary

By combining AI (supervised or RL models) with a Nested Decision Execution approach, Qlib lets you:

  • Unify Daily and Intraday strategies in a single backtest.
  • Leverage Real-time AI for micro-execution decisions.
  • Optimize both large-scale allocations and fine-grained order placements simultaneously.

This framework is especially powerful for High-Frequency Trading use cases, where multiple decision layers (portfolio vs. sub-second order slicing) must interact. Whether you’re using classical ML or advanced RL, Qlib streamlines experimentation and helps close the gap between daily trading and ultra-fast intraday execution.


Further Reading & References

Happy trading!