diff --git a/rdagent/components/coder/factor_coder/evaluators.py b/rdagent/components/coder/factor_coder/evaluators.py index 464f4dd11..873f1be88 100644 --- a/rdagent/components/coder/factor_coder/evaluators.py +++ b/rdagent/components/coder/factor_coder/evaluators.py @@ -1,3 +1,4 @@ +import ast import re from rdagent.components.coder.CoSTEER.evaluators import ( @@ -16,6 +17,95 @@ FactorSingleFeedback = CoSTEERSingleFeedbackDeprecated +# Identifier hints used to recognise an outer ``for`` loop that iterates over +# stocks / instruments / tickers / symbols / codes. +_INSTRUMENT_LOOP_HINTS = ("instrument", "stock", "ticker", "symbol", "code") + +# Substrings used to recognise an ML estimator constructor or training call +# inside a nested loop body. Matched against ``ast.unparse`` of the inner +# ``for`` body, so they catch both ``model.fit(...)`` style calls and direct +# instantiation of common estimators. +_ML_TRAINING_PATTERNS = re.compile( + r"\b(?:" + r"\.fit\s*\(|" + r"\.partial_fit\s*\(|" + r"\.train\s*\(|" + r"train_[a-z_]+\s*\(|" + r"LSTM\s*\(|GRU\s*\(|RNN\s*\(|Transformer\s*\(|" + r"RandomForest\w*\s*\(|XGB\w+\s*\(|LGBM\w*\s*\(|CatBoost\w*\s*\(|" + r"GradientBoosting\w*\s*\(|SVR\s*\(|SVC\s*\(|" + r"MLPRegressor\s*\(|MLPClassifier\s*\(|Sequential\s*\(" + r")" +) + +_PER_INSTRUMENT_TRAINING_FEEDBACK = ( + "Performance-critical anti-pattern detected: the factor code wraps an ML estimator " + "construction or training call inside a nested loop over instruments and time. With " + "N instruments x T trading days this produces O(N * T) training iterations and the " + "run hangs at 100% CPU for hours on realistic A-share panels (see issue #1407). " + "Fit the estimator exactly once on the full (datetime, instrument) panel and " + "batch-predict for every row in a single call, or restrict ML usage to vectorized " + "rolling closed-form estimators that can be expressed via " + "groupby(level='instrument').rolling(...).apply(...) or pandas/numpy operations. " + "Do not re-instantiate or re-train the model per stock or per date." +) + + +def _identifier_text(node: ast.AST) -> str: + """Return a lowercased text snippet for ``node`` suitable for hint matching.""" + + try: + return ast.unparse(node).lower() + except Exception: # pragma: no cover - defensive, ast.unparse is stable on >=3.9 + return "" + + +def _body_source(nodes: list[ast.stmt]) -> str: + parts: list[str] = [] + for node in nodes: + try: + parts.append(ast.unparse(node)) + except Exception: # pragma: no cover - defensive + continue + return "\n".join(parts) + + +def detect_per_instrument_training_antipattern(code: str) -> str | None: + """Detect the nested per-instrument / per-day ML training anti-pattern. + + Returns a critic-style feedback string if the anti-pattern is present in + ``code``, otherwise ``None``. Callers can use the message verbatim as + code feedback so the LLM gets actionable repair guidance without paying + for a multi-hour execution attempt first. + """ + + if not code: + return None + + try: + tree = ast.parse(code) + except SyntaxError: + # Syntax issues are surfaced through the normal execution path. + return None + + for outer in ast.walk(tree): + if not isinstance(outer, (ast.For, ast.AsyncFor)): + continue + + outer_target = _identifier_text(outer.target) + outer_iter = _identifier_text(outer.iter) + if not any(hint in outer_target or hint in outer_iter for hint in _INSTRUMENT_LOOP_HINTS): + continue + + for inner in ast.walk(outer): + if inner is outer or not isinstance(inner, (ast.For, ast.AsyncFor)): + continue + inner_body_source = _body_source(list(inner.body)) + if _ML_TRAINING_PATTERNS.search(inner_body_source): + return _PER_INSTRUMENT_TRAINING_FEEDBACK + + return None + class FactorEvaluatorForCoder(CoSTEEREvaluator): """This class is the v1 version of evaluator for a single factor implementation. @@ -58,6 +148,22 @@ def evaluate( else: factor_feedback = FactorSingleFeedback() + # Pre-execution static check for the per-instrument / per-day ML + # training anti-pattern (issue #1407). On realistic stock panels + # this anti-pattern hangs ``implementation.execute()`` for hours, + # so short-circuit with critic-style feedback before paying that + # cost and let CoSTEER repair using the guidance instead. + anti_pattern_feedback = detect_per_instrument_training_antipattern(implementation.all_codes) + if anti_pattern_feedback is not None: + factor_feedback.execution_feedback = anti_pattern_feedback + factor_feedback.value_generated_flag = False + factor_feedback.value_feedback = "No factor value generated, skip value evaluation." + factor_feedback.code_feedback = anti_pattern_feedback + factor_feedback.final_decision = False + factor_feedback.final_feedback = anti_pattern_feedback + factor_feedback.final_decision_based_on_gt = gt_implementation is not None + return factor_feedback + # 1. Get factor execution feedback to generated implementation and remove the long list of numbers in execution feedback ( execution_feedback, diff --git a/rdagent/scenarios/qlib/experiment/prompts.yaml b/rdagent/scenarios/qlib/experiment/prompts.yaml index ae173a77b..2703c8b53 100644 --- a/rdagent/scenarios/qlib/experiment/prompts.yaml +++ b/rdagent/scenarios/qlib/experiment/prompts.yaml @@ -65,13 +65,34 @@ qlib_factor_strategy: |- other_df['datetime'] = pd.to_datetime(other_df['datetime']) ``` - 5. **Merge operations**: + 5. **Merge operations**: ```python # When merging DataFrames, ensure you are merging on both 'datetime' and 'instrument'. # If these are part of the index, reset the index before merging. merged_df = pd.merge(df, other_df, on=['datetime', 'instrument'], how='inner') ``` + 6. **Avoid per-instrument or per-day model retraining (performance critical)**: + The factor MUST be computable in a single vectorized pass over the full + `(datetime, instrument)` panel. If the factor involves an ML estimator + (e.g. LSTM, GRU, Transformer, RandomForest, XGBoost, LightGBM, MLP), fit + the model exactly ONCE on the full panel and batch-predict for every + `(datetime, instrument)` row in a single call. Do NOT wrap `model.fit(...)` + or estimator construction inside `for instrument in instruments:` or + `for day in trading_days:` loops -- on a realistic 5K-instrument x 1K-day + A-share panel that creates O(instruments * days) fit calls (~10^8 training + iterations) and the run hangs at 100% CPU for hours instead of finishing + in minutes. Use either: + ```python + # Pre-fit once on the full panel, then vectorized predict. + model = SomeEstimator(...).fit(X_panel, y_panel) + df['factor_value'] = model.predict(X_panel) + ``` + or restrict ML usage to lightweight rolling closed-form estimators that + can be expressed with `groupby(level='instrument').rolling(...).apply(...)` + or pandas/numpy vectorized ops -- never a Python-level nested loop that + re-instantiates and re-trains the estimator per stock or per date. + qlib_factor_output_format: |- Your output should be a pandas dataframe similar to the following example information: diff --git a/test/utils/coder/test_factor_antipattern.py b/test/utils/coder/test_factor_antipattern.py new file mode 100644 index 000000000..8c2183934 --- /dev/null +++ b/test/utils/coder/test_factor_antipattern.py @@ -0,0 +1,139 @@ +"""Regression tests for issue #1407. + +The factor coder occasionally generates ML-based factors whose +implementation wraps model construction or training inside a nested loop +over instruments and time. On a realistic 5K-instrument / 1K-day A-share +panel that produces O(instruments * days) training iterations and the run +hangs for hours. ``detect_per_instrument_training_antipattern`` catches +the pattern statically so CoSTEER can surface critic feedback before +spending the wall-clock cost of executing the offending code. +""" + +from __future__ import annotations + +import unittest + +import pytest + +from rdagent.components.coder.factor_coder.evaluators import ( + detect_per_instrument_training_antipattern, +) + + +@pytest.mark.offline +class DetectPerInstrumentTrainingAntipatternTest(unittest.TestCase): + + def test_lstm_per_instrument_per_day_loop_is_flagged(self) -> None: + code = """ +import pandas as pd +import torch +import torch.nn as nn + + +def calculate_lstm_factor(df): + instruments = df.index.get_level_values('instrument').unique() + trading_days = df.index.get_level_values('datetime').unique() + result = pd.Series(index=df.index, dtype=float) + for instrument in instruments: + for day in trading_days: + model = nn.LSTM(input_size=5, hidden_size=8, num_layers=1) + optimizer = torch.optim.Adam(model.parameters()) + # ... train per (instrument, day) ... + model.fit(df.loc[(slice(None, day), instrument)]) + pred = model.predict(df.loc[(day, instrument)]) + result.loc[(day, instrument)] = pred + return result +""" + + feedback = detect_per_instrument_training_antipattern(code) + self.assertIsNotNone(feedback) + self.assertIn("anti-pattern", feedback) + self.assertIn("#1407", feedback) + + def test_random_forest_per_stock_retraining_is_flagged(self) -> None: + code = """ +from sklearn.ensemble import RandomForestRegressor + + +def calculate_rf_factor(df): + for stock_code in df.index.get_level_values('instrument').unique(): + for day in df.index.get_level_values('datetime').unique(): + rf = RandomForestRegressor(n_estimators=100) + rf.fit(features, target) + preds.append(rf.predict(features)) + return preds +""" + + feedback = detect_per_instrument_training_antipattern(code) + self.assertIsNotNone(feedback) + + def test_xgboost_per_ticker_loop_is_flagged(self) -> None: + code = """ +import xgboost as xgb + + +def calculate_factor(df): + for ticker in tickers: + for d in days: + booster = xgb.XGBRegressor(n_estimators=200) + booster.fit(X_train, y_train) + out[ticker, d] = booster.predict(X_test)[0] + return out +""" + + feedback = detect_per_instrument_training_antipattern(code) + self.assertIsNotNone(feedback) + + def test_panel_level_single_fit_is_allowed(self) -> None: + # The recommended pattern -- one fit on the full panel, then batch + # predict -- must not be flagged. + code = """ +from sklearn.ensemble import RandomForestRegressor + + +def calculate_factor(panel_df, X, y): + model = RandomForestRegressor(n_estimators=200) + model.fit(X, y) + panel_df['factor'] = model.predict(X) + return panel_df +""" + + self.assertIsNone(detect_per_instrument_training_antipattern(code)) + + def test_nested_loop_without_training_call_is_allowed(self) -> None: + # Nested iteration over instruments and dates with only statistical + # operations (no .fit / no estimator constructor) must not be flagged. + code = """ +def calculate_momentum(df): + out = {} + for instrument in df.index.get_level_values('instrument').unique(): + for day in df.index.get_level_values('datetime').unique(): + out[(day, instrument)] = df.loc[(day, instrument), 'close'].pct_change(20).mean() + return out +""" + + self.assertIsNone(detect_per_instrument_training_antipattern(code)) + + def test_groupby_rolling_apply_is_allowed(self) -> None: + code = """ +def calculate_rolling_factor(df): + return ( + df.groupby(level='instrument')['close'] + .rolling(20) + .apply(lambda x: x.mean()) + ) +""" + + self.assertIsNone(detect_per_instrument_training_antipattern(code)) + + def test_syntax_error_returns_none(self) -> None: + # Syntax-broken code is handled by the normal execution path -- the + # detector must not raise on it. + self.assertIsNone(detect_per_instrument_training_antipattern("def broken(")) + + def test_empty_code_returns_none(self) -> None: + self.assertIsNone(detect_per_instrument_training_antipattern("")) + + +if __name__ == "__main__": + unittest.main()