Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,17 @@ def _resolve_destination_path(self, source_object: str, prefix: str | None = Non
source_object = os.path.relpath(source_object, start=prefix)
else:
source_object = os.path.basename(source_object)
return os.path.join(self.destination_path, source_object)
# Source object names come from the GCS bucket and may contain ".." segments.
# Normalize the joined path and make sure it stays within destination_path so a
# crafted object name cannot resolve a write target outside the configured directory.
resolved = os.path.normpath(os.path.join(self.destination_path, source_object))
base = os.path.normpath(self.destination_path)
if resolved != base and not resolved.startswith(base + os.sep):
raise ValueError(
f"Resolved destination path {resolved!r} is outside the configured "
f"destination_path {base!r}; refusing to write outside it."
)
return resolved

def _copy_single_object(
self,
Expand Down
32 changes: 32 additions & 0 deletions providers/samba/tests/unit/samba/transfers/test_gcs_to_samba.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,35 @@ def test_execute_more_than_one_wildcard_exception(self, samba_hook_mock, gcs_hoo
)
with pytest.raises(AirflowException):
operator.execute(None)

@pytest.mark.parametrize(
"source_object",
[
"../../victim_area/payload",
"../escape",
"subdir/../../escape",
],
)
def test_resolve_destination_path_rejects_traversal(self, source_object):
operator = GCSToSambaOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object=source_object,
destination_path=DESTINATION_SMB,
gcp_conn_id=GCP_CONN_ID,
samba_conn_id=SAMBA_CONN_ID,
)
with pytest.raises(ValueError, match="outside the configured"):
operator._resolve_destination_path(source_object)

def test_resolve_destination_path_allows_contained_object(self):
operator = GCSToSambaOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object="dir/file.txt",
destination_path=DESTINATION_SMB,
gcp_conn_id=GCP_CONN_ID,
samba_conn_id=SAMBA_CONN_ID,
)
resolved = operator._resolve_destination_path("dir/file.txt")
assert resolved == os.path.join(DESTINATION_SMB, "dir/file.txt")
Loading