feat(exporter): add content-app download log exporter#1258
Conversation
Reviewer's GuideAdds a new export-content-logs CLI entrypoint that queries CloudWatch Logs Insights for pulp-content download logs, parses Python wheel and RPM filenames into typed PyArrow schemas, and writes them to Parquet, while also fixing time parsing and CloudWatch query configuration issues. Sequence diagram for export-content-logs CLI content export flowsequenceDiagram
actor User
participant content_main
participant parse_time
participant build_content_query
participant fetch_cloudwatch_logs
participant convert_content_to_arrow_table
participant write_parquet
User->>content_main: export-content-logs
content_main->>parse_time: parse_time(start_time)
content_main->>parse_time: parse_time(end_time)
content_main->>build_content_query: build_content_query()
build_content_query-->>content_main: query
content_main->>fetch_cloudwatch_logs: fetch_cloudwatch_logs(log_group, query, start_time, end_time, region)
fetch_cloudwatch_logs-->>content_main: results
alt results empty
content_main-->>User: "No logs found"
else results present
content_main->>convert_content_to_arrow_table: convert_content_to_arrow_table(results, content_type)
convert_content_to_arrow_table-->>content_main: table
alt table length is 0
content_main-->>User: "No downloads found"
else table has rows
content_main->>write_parquet: write_parquet(table, output_path, s3_credentials)
write_parquet-->>content_main: success
content_main-->>User: "Export completed successfully"
end
end
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 4 issues, and left some high level feedback:
- The new
content_main/parse_content_argsflow duplicates a lot of the CLI wiring patterns from the existing commands (time parsing, S3 options, CloudWatch args); consider factoring the shared argument and S3-credential handling into reusable helpers to keep the CLI surface consistent and easier to maintain. - In
convert_content_to_arrow_table, most reasons for skipping a record (non-matching log line, non-content path, unsupported filename extension) are silent while malformed filenames emit warnings; if these skipped cases are significant operationally, you may want to add optional debug logging or counters so operators can distinguish “no data” from “heavily filtered data.”
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new `content_main`/`parse_content_args` flow duplicates a lot of the CLI wiring patterns from the existing commands (time parsing, S3 options, CloudWatch args); consider factoring the shared argument and S3-credential handling into reusable helpers to keep the CLI surface consistent and easier to maintain.
- In `convert_content_to_arrow_table`, most reasons for skipping a record (non-matching log line, non-content path, unsupported filename extension) are silent while malformed filenames emit warnings; if these skipped cases are significant operationally, you may want to add optional debug logging or counters so operators can distinguish “no data” from “heavily filtered data.”
## Individual Comments
### Comment 1
<location path="management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/content_cloudwatch.py" line_range="50-53" />
<code_context>
+ return org_value
+
+
+def _parse_timestamp(timestamp_str):
+ if timestamp_str.endswith("Z"):
+ timestamp_str = timestamp_str[:-1]
+ return datetime.fromisoformat(timestamp_str)
+
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Handle invalid or missing @timestamp values more defensively.
`datetime.fromisoformat` will raise `ValueError` for missing, empty, or non-ISO `@timestamp` values, which will abort the export since `timestamp_str` comes from `result.get("@timestamp", "")`. Please guard this with try/except and either skip such records or fall back to another timestamp source so a single bad record doesn’t fail the run.
</issue_to_address>
### Comment 2
<location path="management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/content_cloudwatch.py" line_range="68-72" />
<code_context>
+ records = []
+ skipped_count = 0
+
+ for result in results:
+ message = result.get("message", result.get("@message", ""))
+ timestamp_str = result.get("@timestamp", "")
+
+ parsed_line = parse_content_log_line(message)
+ if parsed_line is None:
+ continue
</code_context>
<issue_to_address>
**suggestion:** Consider tracking or logging counts for records skipped due to log-line/path parse failures.
Right now only malformed filenames increment `skipped_count`; failures in `parse_content_log_line` / `parse_content_path` are ignored. Adding separate counters for these cases and including them in the final summary (like the filename warning) would make format drifts or unexpected volume drops easier to spot and debug.
Suggested implementation:
```python
schema = SCHEMAS[content_type]
filename_parser = FILENAME_PARSERS[content_type]
records = []
skipped_count = 0
log_line_parse_skipped_count = 0
path_parse_skipped_count = 0
```
```python
parsed_line = parse_content_log_line(message)
if parsed_line is None:
log_line_parse_skipped_count += 1
continue
```
```python
parsed_path = parse_content_path(parsed_line["path"])
if parsed_path is None:
path_parse_skipped_count += 1
continue
```
To fully implement the suggestion, you should:
1. Include `log_line_parse_skipped_count` and `path_parse_skipped_count` in whatever summary or warning is logged/emitted at the end of `convert_content_to_arrow_table`, similar to how `skipped_count` is currently surfaced.
2. Consider renaming `skipped_count` to something more specific (e.g. `filename_skipped_count`) in that summary to clearly distinguish between filename, log-line, and path parse failures.
3. If there are unit tests asserting the summary or logged message contents, update them to expect the new counters and their wording.
</issue_to_address>
### Comment 3
<location path="management_tools/pulp-access-logs-exporter/tests/test_content_export.py" line_range="186-195" />
<code_context>
+class TestContentToParquetPython:
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for malformed or non-download records to verify they are skipped and warnings are emitted
Currently `TestContentToParquetPython` (and RPM) only covers happy-path and empty results. Since `convert_content_to_arrow_table` is supposed to skip bad records and warn on malformed filenames, please add tests that: feed in an invalid `message` (no `CONTENT_LOG_REGEX` match), a valid log line with a non-content path, and a `.whl`/`.rpm` filename that makes the filename parser return `None`; then assert that these rows are excluded from the table, the row count matches only valid downloads, and optionally that the malformed-filename warning is emitted to `stderr` (via `capsys`/`capfd`).
Suggested implementation:
```python
class TestContentToParquetPython:
def test_converts_python_downloads(self, sample_content_cloudwatch_results, sample_content_parquet_path):
table = convert_content_to_arrow_table(
sample_content_cloudwatch_results, "python"
)
assert table.num_rows == 2
assert table.schema == PYTHON_SCHEMA
write_parquet(table, sample_content_parquet_path)
read_table = pq.read_table(sample_content_parquet_path)
assert read_table.num_rows == 2
def test_skips_invalid_message_records(self, sample_content_cloudwatch_results):
# Add a record that does not match CONTENT_LOG_REGEX at all
invalid_record = {
"message": "this is not a valid access log line and should be ignored"
}
mixed_results = sample_content_cloudwatch_results + [invalid_record]
table = convert_content_to_arrow_table(mixed_results, "python")
# Only the original valid python download records should be present
assert table.num_rows == 2
def test_skips_non_content_path_records(self, sample_content_cloudwatch_results):
# Start from a valid record and replace the path with a non-content path
# so the regex still matches but parse_content_path returns None
base_message = sample_content_cloudwatch_results[0]["message"]
non_content_path = "/api/pypi/default/dist/simple/pkg/"
non_content_message = base_message.replace("/pulp/content/", non_content_path)
non_content_record = {"message": non_content_message}
mixed_results = sample_content_cloudwatch_results + [non_content_record]
table = convert_content_to_arrow_table(mixed_results, "python")
# Non-content paths should be skipped
assert table.num_rows == 2
def test_skips_malformed_filename_and_warns(
self, sample_content_cloudwatch_results, capsys
):
# Use a valid record but make the filename malformed so the filename parser returns None
base_message = sample_content_cloudwatch_results[0]["message"]
# Intentionally break the filename while keeping the .whl extension so the
# content-type filter still considers it a python download
malformed_message = base_message.replace(".whl", "-malformed-file-name.whl")
malformed_record = {"message": malformed_message}
mixed_results = sample_content_cloudwatch_results + [malformed_record]
table = convert_content_to_arrow_table(mixed_results, "python")
# Only well-formed python downloads should be present
assert table.num_rows == 2
captured = capsys.readouterr()
# Ensure a warning about the malformed filename was emitted
assert "malformed" in captured.err.lower()
```
These tests assume:
1. `sample_content_cloudwatch_results[0]["message"]` contains the substring `/pulp/content/` which can be replaced with a non-content path for the non-content test. If the actual content path prefix differs, adjust the `.replace("/pulp/content/", non_content_path)` call to match the real prefix used in your logs.
2. The warning for malformed filenames contains the word `"malformed"` in `stderr`. If the real warning message is different, update the final assertion in `test_skips_malformed_filename_and_warns` to match the actual warning text (for example, assert on the exact message or a more specific substring).
3. If warnings are logged via `logging` instead of `stderr`, replace the `capsys` usage with `caplog` and assert on `caplog.records` or `caplog.text` accordingly.
</issue_to_address>
### Comment 4
<location path="management_tools/pulp-access-logs-exporter/tests/test_content_export.py" line_range="211-220" />
<code_context>
+class TestContentToParquetRpm:
</code_context>
<issue_to_address>
**suggestion (testing):** Cover edge cases for cache/size/org_id parsing (e.g. '-', empty, or unexpected values)
Existing Parquet tests cover only `HIT`/`MISS` and valid org IDs. In `content_cloudwatch`, the normalization helpers (`_parse_cache_hit`, `_parse_artifact_size`, `_parse_org_id`) also handle:
- `artifact_size` of `"-"` or empty → `None`
- `rh_org_id` of `"-"` or empty → `None`
- `cache` values other than `HIT`/`MISS` → `None`
Add one or two tests that feed CloudWatch records with these edge values and assert the resulting table has `None` in the corresponding columns to capture this behavior and guard against future format changes.
Suggested implementation:
```python
class TestContentToParquetRpm:
def test_converts_rpm_downloads(self, sample_content_cloudwatch_results, sample_content_parquet_path):
table = convert_content_to_arrow_table(
sample_content_cloudwatch_results, "rpm"
)
assert table.num_rows == 2
assert table.schema == RPM_SCHEMA
write_parquet(table, sample_content_parquet_path)
read_table = pq.read_table(sample_content_parquet_path)
assert read_table.num_rows == 2
def test_parses_edge_values_for_cache_size_and_org_id(
self,
sample_content_cloudwatch_results,
):
# Create a record with edge values for cache/size/org_id
edge_record = copy.deepcopy(sample_content_cloudwatch_results[0])
edge_record["cache"] = "UNKNOWN" # unexpected cache value -> None
edge_record["artifact_size"] = "-" # "-" size -> None
edge_record["rh_org_id"] = "-" # "-" org_id -> None
records = list(sample_content_cloudwatch_results) + [edge_record]
table = convert_content_to_arrow_table(records, "rpm")
rows = table.to_pydict()
# Original rows are still parsed as before
assert table.num_rows == 3
assert rows["cache_hit"][0] is True
assert rows["cache_hit"][1] is False
# Edge row normalized to None
assert rows["cache_hit"][2] is None
assert rows["artifact_size"][2] is None
assert rows["org_id"][2] is None
def test_parses_empty_values_for_size_and_org_id(
self,
sample_content_cloudwatch_results,
):
# Create a record with empty values for size/org_id and "-" cache
edge_record = copy.deepcopy(sample_content_cloudwatch_results[0])
edge_record["cache"] = "-" # "-" cache -> None
edge_record["artifact_size"] = "" # empty size -> None
edge_record["rh_org_id"] = "" # empty org_id -> None
records = list(sample_content_cloudwatch_results) + [edge_record]
table = convert_content_to_arrow_table(records, "rpm")
rows = table.to_pydict()
assert table.num_rows == 3
assert rows["cache_hit"][2] is None
assert rows["artifact_size"][2] is None
assert rows["org_id"][2] is None
```
1. Ensure `import copy` is added at the top of `test_content_export.py`, e.g.:
`import copy`
2. If the fields in `sample_content_cloudwatch_results` are nested (e.g. under a `"fields"` or similar key), update the `edge_record[...]` assignments to match the actual structure, so that the `cache`, `artifact_size`, and `rh_org_id` values used by `convert_content_to_arrow_table` are correctly overridden.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
Addressed Sourcery review feedback in 5b3a6e2: Comment 1 (bug_risk — defensive timestamp parsing): Fixed. Comment 2 (skip counters): Fixed. Added 5 per-category counters (log parse, path parse, extension, malformed filename, bad timestamp) printed to stderr for operational visibility. Comment 3 (tests for malformed records): Partially addressed. Added 3 tests: invalid/non-matching records, malformed filename warning (capsys), and bad timestamps. Skipped the suggested non-content-path test — the suggested implementation had a bug ( Comment 4 (edge case tests for helpers): Not addressing. The suggested implementation tries to |
|
/retest |
New `export-content-logs` CLI that exports Python (.whl) and RPM (.rpm) download logs from the content-app (pulp-content) to Parquet, with parsed package metadata (name, version, architecture) from filenames. Separate from the existing PyPI API exporter — this captures actual downloads with artifact_size, cache hit/miss, and version details that API logs lack. Fixes: - Use logGroupNames (plural) for JSON-structured log support - Use timezone-aware datetime to fix epoch offset in non-UTC systems PULP-1811 Assisted-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Address Sourcery review feedback: - Add per-category skip counters (log parse, path parse, extension, malformed filename, bad timestamp) printed to stderr for operational visibility - Handle empty or malformed @timestamp values gracefully instead of crashing the entire export - Add tests for malformed filenames, invalid records, and bad timestamps Assisted-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
5b3a6e2 to
893b120
Compare
Summary
export-content-logsCLI command that exports Python (.whl/.whl.metadata) and RPM (.rpm) download logs from content-app (pulp-content) CloudWatch streams to Parquetparse_time()(utcnow → timezone-aware) and switches tologGroupNames(plural) for JSON-structured log supportTest plan
Jira: PULP-1811
Epic: PULP-1809
🤖 Generated with Claude Code
Summary by Sourcery
Add a new CLI command for exporting pulp-content download logs from CloudWatch to Parquet, with typed schemas for Python wheels and RPMs, and improve time handling and CloudWatch querying.
New Features:
Bug Fixes:
Tests: