Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions rdagent/components/coder/factor_coder/evaluators.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ast
import re

from rdagent.components.coder.CoSTEER.evaluators import (
Expand All @@ -16,6 +17,95 @@

FactorSingleFeedback = CoSTEERSingleFeedbackDeprecated

# Identifier hints used to recognise an outer ``for`` loop that iterates over
# stocks / instruments / tickers / symbols / codes.
_INSTRUMENT_LOOP_HINTS = ("instrument", "stock", "ticker", "symbol", "code")

# Substrings used to recognise an ML estimator constructor or training call
# inside a nested loop body. Matched against ``ast.unparse`` of the inner
# ``for`` body, so they catch both ``model.fit(...)`` style calls and direct
# instantiation of common estimators.
_ML_TRAINING_PATTERNS = re.compile(
r"\b(?:"
r"\.fit\s*\(|"
r"\.partial_fit\s*\(|"
r"\.train\s*\(|"
r"train_[a-z_]+\s*\(|"
r"LSTM\s*\(|GRU\s*\(|RNN\s*\(|Transformer\s*\(|"
r"RandomForest\w*\s*\(|XGB\w+\s*\(|LGBM\w*\s*\(|CatBoost\w*\s*\(|"
r"GradientBoosting\w*\s*\(|SVR\s*\(|SVC\s*\(|"
r"MLPRegressor\s*\(|MLPClassifier\s*\(|Sequential\s*\("
r")"
)

_PER_INSTRUMENT_TRAINING_FEEDBACK = (
"Performance-critical anti-pattern detected: the factor code wraps an ML estimator "
"construction or training call inside a nested loop over instruments and time. With "
"N instruments x T trading days this produces O(N * T) training iterations and the "
"run hangs at 100% CPU for hours on realistic A-share panels (see issue #1407). "
"Fit the estimator exactly once on the full (datetime, instrument) panel and "
"batch-predict for every row in a single call, or restrict ML usage to vectorized "
"rolling closed-form estimators that can be expressed via "
"groupby(level='instrument').rolling(...).apply(...) or pandas/numpy operations. "
"Do not re-instantiate or re-train the model per stock or per date."
)


def _identifier_text(node: ast.AST) -> str:
"""Return a lowercased text snippet for ``node`` suitable for hint matching."""

try:
return ast.unparse(node).lower()
except Exception: # pragma: no cover - defensive, ast.unparse is stable on >=3.9
return ""


def _body_source(nodes: list[ast.stmt]) -> str:
parts: list[str] = []
for node in nodes:
try:
parts.append(ast.unparse(node))
except Exception: # pragma: no cover - defensive
continue
return "\n".join(parts)


def detect_per_instrument_training_antipattern(code: str) -> str | None:
"""Detect the nested per-instrument / per-day ML training anti-pattern.

Returns a critic-style feedback string if the anti-pattern is present in
``code``, otherwise ``None``. Callers can use the message verbatim as
code feedback so the LLM gets actionable repair guidance without paying
for a multi-hour execution attempt first.
"""

if not code:
return None

try:
tree = ast.parse(code)
except SyntaxError:
# Syntax issues are surfaced through the normal execution path.
return None

for outer in ast.walk(tree):
if not isinstance(outer, (ast.For, ast.AsyncFor)):
continue

outer_target = _identifier_text(outer.target)
outer_iter = _identifier_text(outer.iter)
if not any(hint in outer_target or hint in outer_iter for hint in _INSTRUMENT_LOOP_HINTS):
continue

for inner in ast.walk(outer):
if inner is outer or not isinstance(inner, (ast.For, ast.AsyncFor)):
continue
inner_body_source = _body_source(list(inner.body))
if _ML_TRAINING_PATTERNS.search(inner_body_source):
return _PER_INSTRUMENT_TRAINING_FEEDBACK

return None


class FactorEvaluatorForCoder(CoSTEEREvaluator):
"""This class is the v1 version of evaluator for a single factor implementation.
Expand Down Expand Up @@ -58,6 +148,22 @@ def evaluate(
else:
factor_feedback = FactorSingleFeedback()

# Pre-execution static check for the per-instrument / per-day ML
# training anti-pattern (issue #1407). On realistic stock panels
# this anti-pattern hangs ``implementation.execute()`` for hours,
# so short-circuit with critic-style feedback before paying that
# cost and let CoSTEER repair using the guidance instead.
anti_pattern_feedback = detect_per_instrument_training_antipattern(implementation.all_codes)
if anti_pattern_feedback is not None:
factor_feedback.execution_feedback = anti_pattern_feedback
factor_feedback.value_generated_flag = False
factor_feedback.value_feedback = "No factor value generated, skip value evaluation."
factor_feedback.code_feedback = anti_pattern_feedback
factor_feedback.final_decision = False
factor_feedback.final_feedback = anti_pattern_feedback
factor_feedback.final_decision_based_on_gt = gt_implementation is not None
return factor_feedback

# 1. Get factor execution feedback to generated implementation and remove the long list of numbers in execution feedback
(
execution_feedback,
Expand Down
23 changes: 22 additions & 1 deletion rdagent/scenarios/qlib/experiment/prompts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,34 @@ qlib_factor_strategy: |-
other_df['datetime'] = pd.to_datetime(other_df['datetime'])
```

5. **Merge operations**:
5. **Merge operations**:
```python
# When merging DataFrames, ensure you are merging on both 'datetime' and 'instrument'.
# If these are part of the index, reset the index before merging.
merged_df = pd.merge(df, other_df, on=['datetime', 'instrument'], how='inner')
```

6. **Avoid per-instrument or per-day model retraining (performance critical)**:
The factor MUST be computable in a single vectorized pass over the full
`(datetime, instrument)` panel. If the factor involves an ML estimator
(e.g. LSTM, GRU, Transformer, RandomForest, XGBoost, LightGBM, MLP), fit
the model exactly ONCE on the full panel and batch-predict for every
`(datetime, instrument)` row in a single call. Do NOT wrap `model.fit(...)`
or estimator construction inside `for instrument in instruments:` or
`for day in trading_days:` loops -- on a realistic 5K-instrument x 1K-day
A-share panel that creates O(instruments * days) fit calls (~10^8 training
iterations) and the run hangs at 100% CPU for hours instead of finishing
in minutes. Use either:
```python
# Pre-fit once on the full panel, then vectorized predict.
model = SomeEstimator(...).fit(X_panel, y_panel)
df['factor_value'] = model.predict(X_panel)
```
or restrict ML usage to lightweight rolling closed-form estimators that
can be expressed with `groupby(level='instrument').rolling(...).apply(...)`
or pandas/numpy vectorized ops -- never a Python-level nested loop that
re-instantiates and re-trains the estimator per stock or per date.

qlib_factor_output_format: |-
Your output should be a pandas dataframe similar to the following example information:
<class 'pandas.core.frame.DataFrame'>
Expand Down
139 changes: 139 additions & 0 deletions test/utils/coder/test_factor_antipattern.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Regression tests for issue #1407.

The factor coder occasionally generates ML-based factors whose
implementation wraps model construction or training inside a nested loop
over instruments and time. On a realistic 5K-instrument / 1K-day A-share
panel that produces O(instruments * days) training iterations and the run
hangs for hours. ``detect_per_instrument_training_antipattern`` catches
the pattern statically so CoSTEER can surface critic feedback before
spending the wall-clock cost of executing the offending code.
"""

from __future__ import annotations

import unittest

import pytest

from rdagent.components.coder.factor_coder.evaluators import (
detect_per_instrument_training_antipattern,
)


@pytest.mark.offline
class DetectPerInstrumentTrainingAntipatternTest(unittest.TestCase):

def test_lstm_per_instrument_per_day_loop_is_flagged(self) -> None:
code = """
import pandas as pd
import torch
import torch.nn as nn


def calculate_lstm_factor(df):
instruments = df.index.get_level_values('instrument').unique()
trading_days = df.index.get_level_values('datetime').unique()
result = pd.Series(index=df.index, dtype=float)
for instrument in instruments:
for day in trading_days:
model = nn.LSTM(input_size=5, hidden_size=8, num_layers=1)
optimizer = torch.optim.Adam(model.parameters())
# ... train per (instrument, day) ...
model.fit(df.loc[(slice(None, day), instrument)])
pred = model.predict(df.loc[(day, instrument)])
result.loc[(day, instrument)] = pred
return result
"""

feedback = detect_per_instrument_training_antipattern(code)
self.assertIsNotNone(feedback)
self.assertIn("anti-pattern", feedback)
self.assertIn("#1407", feedback)

def test_random_forest_per_stock_retraining_is_flagged(self) -> None:
code = """
from sklearn.ensemble import RandomForestRegressor


def calculate_rf_factor(df):
for stock_code in df.index.get_level_values('instrument').unique():
for day in df.index.get_level_values('datetime').unique():
rf = RandomForestRegressor(n_estimators=100)
rf.fit(features, target)
preds.append(rf.predict(features))
return preds
"""

feedback = detect_per_instrument_training_antipattern(code)
self.assertIsNotNone(feedback)

def test_xgboost_per_ticker_loop_is_flagged(self) -> None:
code = """
import xgboost as xgb


def calculate_factor(df):
for ticker in tickers:
for d in days:
booster = xgb.XGBRegressor(n_estimators=200)
booster.fit(X_train, y_train)
out[ticker, d] = booster.predict(X_test)[0]
return out
"""

feedback = detect_per_instrument_training_antipattern(code)
self.assertIsNotNone(feedback)

def test_panel_level_single_fit_is_allowed(self) -> None:
# The recommended pattern -- one fit on the full panel, then batch
# predict -- must not be flagged.
code = """
from sklearn.ensemble import RandomForestRegressor


def calculate_factor(panel_df, X, y):
model = RandomForestRegressor(n_estimators=200)
model.fit(X, y)
panel_df['factor'] = model.predict(X)
return panel_df
"""

self.assertIsNone(detect_per_instrument_training_antipattern(code))

def test_nested_loop_without_training_call_is_allowed(self) -> None:
# Nested iteration over instruments and dates with only statistical
# operations (no .fit / no estimator constructor) must not be flagged.
code = """
def calculate_momentum(df):
out = {}
for instrument in df.index.get_level_values('instrument').unique():
for day in df.index.get_level_values('datetime').unique():
out[(day, instrument)] = df.loc[(day, instrument), 'close'].pct_change(20).mean()
return out
"""

self.assertIsNone(detect_per_instrument_training_antipattern(code))

def test_groupby_rolling_apply_is_allowed(self) -> None:
code = """
def calculate_rolling_factor(df):
return (
df.groupby(level='instrument')['close']
.rolling(20)
.apply(lambda x: x.mean())
)
"""

self.assertIsNone(detect_per_instrument_training_antipattern(code))

def test_syntax_error_returns_none(self) -> None:
# Syntax-broken code is handled by the normal execution path -- the
# detector must not raise on it.
self.assertIsNone(detect_per_instrument_training_antipattern("def broken("))

def test_empty_code_returns_none(self) -> None:
self.assertIsNone(detect_per_instrument_training_antipattern(""))


if __name__ == "__main__":
unittest.main()
Loading