Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions compare_alpha_baselines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
"""
Compare ALPHA20 vs ALPHA158 baseline backtest performance.

This script runs backtests for both factor sets and compares key metrics:
- 年化收益 (Annualized Return)
- 最大回撤 (Maximum Drawdown)
- 信息比率 (Information Ratio)
- IC均值 (Mean IC)
- ICIR (IC Information Ratio)

Usage:
python compare_alpha_baselines.py

Note: Requires Docker environment with qlib image prepared.
"""

from __future__ import annotations

import sys
from typing import Any

import docker
from rdagent.log import rdagent_logger as logger
from rdagent.scenarios.qlib.developer.factor_runner import QlibFactorRunner
from rdagent.scenarios.qlib.experiment.factor_experiment import QlibFactorExperiment
from rdagent.utils.qlib import ALPHA20, ALPHA158

BACKTEST_CONFIG = {
"train_start": "2024-01-01",
"train_end": "2024-12-31",
"valid_start": "2025-01-01",
"valid_end": "2025-06-30",
"test_start": "2025-07-01",
"test_end": "2026-03-30",
"market": "csi500",
}


def create_experiment(
factor_dict: dict[str, str],
name: str,
) -> QlibFactorExperiment:
"""
Create a QlibFactorExperiment with given factors.

Args:
factor_dict: Dictionary of factor names to expressions
name: Experiment name for logging

Returns:
QlibFactorExperiment instance ready for backtest
"""
logger.info(f"Creating experiment: {name} with {len(factor_dict)} factors")

exp = QlibFactorExperiment()
exp.base_features = factor_dict.copy()

return exp


def run_backtest(exp: QlibFactorExperiment) -> dict[str, Any]:
"""
Run backtest for the experiment and extract metrics.

Args:
exp: QlibFactorExperiment instance

Returns:
Dictionary with backtest metrics
"""
runner = QlibFactorRunner()

try:
result_exp = runner.develop(exp)
if result_exp.result is not None:
return extract_metrics(result_exp.result)
logger.error(f"Backtest failed: {result_exp.stdout}")
except (RuntimeError, ValueError, KeyError) as e:
logger.error(f"Exception during backtest: {e}")
return {"error": str(e)}
else:
return {"error": result_exp.stdout}


def extract_metrics(result: Any) -> dict[str, Any]:
"""
Extract key metrics from backtest result.

Args:
result: Backtest result (pandas Series or DataFrame)

Returns:
Dictionary with extracted metrics
"""
metrics = {}

if hasattr(result, "index"):
for key in result.index:
if "annualized_return" in key.lower():
metrics["年化收益"] = result[key]
if "max_drawdown" in key.lower():
metrics["最大回撤"] = result[key]
if "information_ratio" in key.lower():
metrics["信息比率"] = result[key]
if key.lower() == "ic.mean" or "ic_mean" in key.lower():
metrics["IC均值"] = result[key]
if key.lower() == "ic.ir" or "icir" in key.lower():
metrics["ICIR"] = result[key]

return metrics


def format_percentage(value: Any) -> str:
"""Format value as percentage string."""
if value is None:
return "N/A"
try:
return f"{float(value) * 100:.2f}%"
except (TypeError, ValueError):
return str(value)


def format_number(value: Any) -> str:
"""Format value as number string."""
if value is None:
return "N/A"
try:
return f"{float(value):.2f}"
except (TypeError, ValueError):
return str(value)


def print_comparison_table(
alpha20_results: dict[str, Any],
alpha158_results: dict[str, Any],
) -> None:
"""
Print comparison table for ALPHA20 vs ALPHA158.

Args:
alpha20_results: Metrics from ALPHA20 backtest
alpha158_results: Metrics from ALPHA158 backtest
"""
print("\n" + "=" * 60)
print("ALPHA20 vs ALPHA158 Baseline Comparison")
print("=" * 60)

print(f"| {'Metric':<15} | {'ALPHA20':<12} | {'ALPHA158':<12} |")
print(f"|{'-' * 17}|{'-' * 14}|{'-' * 14}|")

metrics_order = ["年化收益", "最大回撤", "信息比率", "IC均值", "ICIR"]

for metric in metrics_order:
a20_val = alpha20_results.get(metric)
a158_val = alpha158_results.get(metric)

if metric in ["年化收益", "最大回撤"]:
a20_str = format_percentage(a20_val)
a158_str = format_percentage(a158_val)
else:
a20_str = format_number(a20_val)
a158_str = format_number(a158_val)

print(f"| {metric:<15} | {a20_str:<12} | {a158_str:<12} |")

print("=" * 60)
print(f"\nFactor Count: ALPHA20 = {len(ALPHA20)}, ALPHA158 = {len(ALPHA158)}")


def main() -> None:
"""Main entry point for baseline comparison."""
print("=" * 60)
print("ALPHA Baseline Comparison Script")
print("=" * 60)
print("\nConfiguration:")
print(f" Train period: {BACKTEST_CONFIG['train_start']} ~ {BACKTEST_CONFIG['train_end']}")
print(f" Valid period: {BACKTEST_CONFIG['valid_start']} ~ {BACKTEST_CONFIG['valid_end']}")
print(f" Test period: {BACKTEST_CONFIG['test_start']} ~ {BACKTEST_CONFIG['test_end']}")
print(f" Market: {BACKTEST_CONFIG['market']}")
print(f"\nALPHA20 factors: {len(ALPHA20)}")
print(f"ALPHA158 factors: {len(ALPHA158)}")

try:
client = docker.from_env()
client.ping()
print("\n✓ Docker connection successful")
except (docker.errors.DockerException, OSError) as e:
print(f"\n⚠ Docker not available: {e}")
print(" Backtests require Docker with qlib image")
print(" Run: docker build -t local_qlib:latest -f Dockerfile_qlib .")
sys.exit(1)

print("\n" + "-" * 60)
print("Creating experiments...")
alpha20_exp = create_experiment(ALPHA20, "ALPHA20")
alpha158_exp = create_experiment(ALPHA158, "ALPHA158")

print("\n" + "-" * 60)
print("Running ALPHA20 backtest...")
alpha20_results = run_backtest(alpha20_exp)

print("\n" + "-" * 60)
print("Running ALPHA158 backtest...")
alpha158_results = run_backtest(alpha158_exp)

print_comparison_table(alpha20_results, alpha158_results)


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion rdagent/app/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,13 @@ def ds_user_interact(port=19900):
@app.command(name="fin_factor")
def fin_factor_cli(
path: Optional[str] = None,
base_features_path: Optional[str] = None,
step_n: Optional[int] = None,
loop_n: Optional[int] = None,
all_duration: Optional[str] = None,
checkout: CheckoutOption = True,
):
fin_factor(path=path, step_n=step_n, loop_n=loop_n, all_duration=all_duration, checkout=checkout)
fin_factor(path=path, base_features_path=base_features_path, step_n=step_n, loop_n=loop_n, all_duration=all_duration, checkout=checkout)


@app.command(name="fin_model")
Expand Down
2 changes: 1 addition & 1 deletion rdagent/components/workflow/rd_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def _interact_init_params(self) -> None:
logger.info("Received user instruction response.")
self.plan.update(res_dict)

if "feature_codes" not in self.plan:
if "feature_codes" in self.plan:
self.plan[
"user_instruction"
] += f"\n\n{str(list(self.plan['feature_codes'].keys()))} has been configured as the base factor; do not generate duplicate factors."
Expand Down
47 changes: 37 additions & 10 deletions rdagent/scenarios/qlib/developer/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import List
import re

import pandas as pd

from rdagent.components.coder.CoSTEER.evaluators import CoSTEERMultiFeedback
from rdagent.components.coder.factor_coder.factor import FactorFBWorkspace, FactorTask
from rdagent.core.conf import RD_AGENT_SETTINGS
Expand All @@ -11,6 +10,33 @@
from rdagent.scenarios.qlib.experiment.factor_experiment import QlibFactorExperiment


def _fix_groupby_rolling_pattern(code: str) -> str:
"""
Fix pandas groupby().rolling() patterns that cause index duplication.

Converts: .groupby(level='instrument').rolling(window=N).mean()
To: .groupby(level='instrument').transform(lambda x: x.rolling(window=N).mean())
"""
# Pattern to match: groupby(...).rolling(...).{mean|sum|std|min|max}()
pattern = (
r"\.groupby\s*\(\s*level\s*=\s*['\"]instrument['\"]\s*\)"
r"\s*\.\s*rolling\s*\(\s*window\s*=\s*(\d+)\s*\)"
r"\s*\.\s*(mean|sum|std|min|max)\s*\(\s*\)"
)

def replace_func(match: re.Match[str]) -> str:
window = match.group(1)
operation = match.group(2)
return f".groupby(level='instrument').transform(lambda x: x.rolling(window={window}).{operation}())"

fixed_code = re.sub(pattern, replace_func, code)

if fixed_code != code:
logger.info("Auto-fixed groupby().rolling() pattern to use transform()")

return fixed_code


def _build_base_feature_workspaces(exp: QlibFactorExperiment) -> list[FactorFBWorkspace]:
workspaces: list[FactorFBWorkspace] = []
for file_name, code in exp.base_feature_codes.items():
Expand All @@ -19,9 +45,10 @@ def _build_base_feature_workspaces(exp: QlibFactorExperiment) -> list[FactorFBWo
factor_name=file_name,
factor_description=f"Base feature from {file_name}",
factor_formulation="",
)
),
)
workspace.inject_files(**{"factor.py": code})
fixed_code = _fix_groupby_rolling_pattern(code)
workspace.inject_files(**{"factor.py": fixed_code})
workspaces.append(workspace)
return workspaces

Expand Down Expand Up @@ -54,13 +81,13 @@ def _resolve_index_level_values(df: pd.DataFrame, level_name: str) -> pd.Index |
if all(first_values.equals(values) for values in candidate_values[1:]):
logger.warning(
f"Factor dataframe has duplicated '{level_name}' index levels at positions {matching_levels}; "
"their values are identical, so the first one is used."
"their values are identical, so the first one is used.",
)
return first_values

logger.warning(
f"Skip factor dataframe because index has ambiguous duplicated '{level_name}' levels at positions "
f"{matching_levels}. index names={list(df.index.names)}"
f"{matching_levels}. index names={list(df.index.names)}",
)
return None

Expand Down Expand Up @@ -128,7 +155,7 @@ def _process_message_and_df(
return error_message


def process_factor_data(exp_or_list: List[QlibFactorExperiment] | QlibFactorExperiment) -> pd.DataFrame:
def process_factor_data(exp_or_list: list[QlibFactorExperiment] | QlibFactorExperiment) -> pd.DataFrame:
"""
Process and combine factor data from experiment implementations.

Expand Down Expand Up @@ -165,13 +192,13 @@ def process_factor_data(exp_or_list: List[QlibFactorExperiment] | QlibFactorExpe
except Exception as concat_error:
concat_index_info = " | ".join([f"df#{i}: {_format_index_info(df)}" for i, df in enumerate(factor_dfs)])
logger.warning(
f"Failed to concat factor data due to index misalignment. concat_error={concat_error}; collected_index_info={concat_index_info}"
f"Failed to concat factor data due to index misalignment. concat_error={concat_error}; collected_index_info={concat_index_info}",
)
raise FactorEmptyError(
"Failed to concat factor data due to index misalignment or incompatible index structure. "
f"concat_error={concat_error}; collected_index_info={concat_index_info}; details={error_message}"
f"concat_error={concat_error}; collected_index_info={concat_index_info}; details={error_message}",
) from concat_error
else:
raise FactorEmptyError(
f"No valid factor data found to merge (in process_factor_data) because of {error_message}."
f"No valid factor data found to merge (in process_factor_data) because of {error_message}.",
)
34 changes: 34 additions & 0 deletions rdagent/scenarios/qlib/experiment/prompts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,40 @@ qlib_factor_interface: |-
Your python code should follow the interface to better interact with the user's system.
Your python code should contain the following part: the import part, the function part, and the main part. You should write a main function name: "calculate_{function_name}" and call this function in "if __name__ == __main__" part. Don't write any try-except block in your python code. The user will catch the exception message and provide the feedback to you.
User will write your python code into a python file and execute the file directly with "python {your_file_name}.py". You should calculate the factor values and save the result into a HDF5(H5) file named "result.h5" in the same directory as your python file. The result file is a HDF5(H5) file containing a pandas dataframe. The index of the dataframe is the "datetime" and "instrument", and the single column name is the factor name,and the value is the factor value. The result file should be saved in the same directory as your python file.

**CRITICAL: Pandas MultiIndex groupby().rolling() Pattern**

When working with MultiIndexed Series (index: ['datetime', 'instrument']), you MUST use the correct pattern for rolling operations:

❌ WRONG - This causes "ValueError: The name instrument occurs multiple times":
```python
# DO NOT use this pattern - it creates a 3-level index with duplicate 'instrument'
ma_20 = volume.groupby(level='instrument').rolling(window=20).mean()
result = volume / ma_20 # FAILS!
```

✅ CORRECT - Use transform() to preserve the 2-level index structure:
```python
# CORRECT: Use transform() with lambda to preserve index structure
ma_20 = volume.groupby(level='instrument').transform(lambda x: x.rolling(window=20).mean())
result = volume / ma_20 # Works correctly!
```

The key difference:
- `groupby().rolling()` returns a Series with extra index level: ['instrument', 'datetime', 'instrument']
- `groupby().transform(lambda x: x.rolling().mean())` preserves original index: ['datetime', 'instrument']

For other rolling operations (sum, std, min, max, etc.), always use transform():
```python
# Rolling standard deviation
rolling_std = series.groupby(level='instrument').transform(lambda x: x.rolling(window=20).std())

# Rolling sum
rolling_sum = series.groupby(level='instrument').transform(lambda x: x.rolling(window=10).sum())

# Rolling min/max
rolling_min = series.groupby(level='instrument').transform(lambda x: x.rolling(window=5).min())
```

qlib_factor_strategy: |-
Ensure that for every step of data processing, the data format (including indexes) is clearly explained through comments.
Expand Down
Loading