From 9d343f4c2099c33761f6ac364a6772d81b43e602 Mon Sep 17 00:00:00 2001 From: atif09 Date: Mon, 15 Jun 2026 23:59:17 +0530 Subject: [PATCH 1/5] [feature] Add extraction metadata fields, state machine, and async extraction task #412 - Add extraction metadata fields to FirmwareImage (board, compatible, target, fw_version, compat_version, source, extraction_status, extraction_log, failure_reason) - Add state machine with statuses: unconfirmed, in_progress, success, failed, invalid, manually_confirmed - Add unique_together on (build, type) to prevent duplicate image uploads - Add _validate_locked() to make metadata fields read-only after confirmation, allowing empty fields to be filled - Add trigger_metadata_extraction post_save signal to schedule Celery task and set build status to analyzing - Add auto_create_device_firmware signal to trigger device pairing on confirmed status - Ad extract_firmware_metadata Celery task with handling for different exception cases - Add _compat_blocks_pairing() to skip device pairing when compat_version > 1.0 Related to #412 --- openwisp_firmware_upgrader/admin.py | 196 +++++++++++++++- openwisp_firmware_upgrader/apps.py | 16 ++ openwisp_firmware_upgrader/base/models.py | 220 +++++++++++++++++- ...ild_status_firmwareimage_board_and_more.py | 100 ++++++++ openwisp_firmware_upgrader/tasks.py | 152 ++++++++++++ openwisp_firmware_upgrader/tests/base.py | 2 + .../tests/test_admin.py | 141 ++++++++++- openwisp_firmware_upgrader/tests/test_api.py | 6 +- .../tests/test_extractors.py | 5 + .../tests/test_models.py | 181 ++++++++++++-- .../tests/test_tasks.py | 196 ++++++++++++++++ .../openwisp2/sample_connection/api/views.py | 2 +- ...ild_status_firmwareimage_board_and_more.py | 100 ++++++++ 13 files changed, 1281 insertions(+), 36 deletions(-) create mode 100644 openwisp_firmware_upgrader/migrations/0018_build_status_firmwareimage_board_and_more.py create mode 100644 tests/openwisp2/sample_firmware_upgrader/migrations/0005_build_status_firmwareimage_board_and_more.py diff --git a/openwisp_firmware_upgrader/admin.py b/openwisp_firmware_upgrader/admin.py index ea25e5dc5..7825e0a13 100644 --- a/openwisp_firmware_upgrader/admin.py +++ b/openwisp_firmware_upgrader/admin.py @@ -38,6 +38,7 @@ LocationFilter, ) from .swapper import load_model +from .tasks import extract_firmware_metadata from .utils import get_upgrader_schema_for_device from .widgets import FirmwareSchemaWidget, MassUpgradeSelect2Widget @@ -60,6 +61,45 @@ ) IN_PROGRESS_STATUS = UpgradeOperation.CANCELLABLE_STATUS +_STATUS_CONFIG = { + "unconfirmed": {"label": _("Unconfirmed"), "class": "ow-status-grey"}, + "in_progress": {"label": _("In Progress"), "class": "ow-status-warning"}, + "success": {"label": _("Success"), "class": "ow-status-success"}, + "failed": {"label": _("Failed"), "class": "ow-status-error"}, + "manually_confirmed": { + "label": _("Manually Confirmed"), + "class": "ow-status-success", + }, + "invalid": {"label": _("Invalid"), "class": "ow-status-error"}, +} + +_BUILD_STATUS_CONFIG = { + "analyzing": ("warning", _("Analyzing")), + "success": ("success", _("Success")), + "failed": ("error", _("Failed")), + "invalid": ("error", _("Invalid")), + "manually_confirmed": ("success", _("Manually Confirmed")), +} + +_FAILURE_REASON_TEXT = { + "unsupported_format": _( + "Both fwtool and DTB scan were unable to extract metadata. " + "Fill in the Device Metadata fields below manually." + ), + "out_of_memory": _("Decompression exceeded the configured size or ratio limit."), + "invalid_file": _("This file was rejected as an invalid firmware image."), + "timeout": _("Metadata extraction timed out. Re-upload the image to try again."), +} + + +def _extraction_status_badge(status): + cfg = _STATUS_CONFIG.get(status, {"label": status, "class": "ow-status-grey"}) + return format_html( + '{}', + cfg["class"], + cfg["label"], + ) + class BaseAdmin(MultitenantAdminMixin, TimeReadonlyAdminMixin, admin.ModelAdmin): save_on_top = True @@ -82,6 +122,11 @@ class CategoryAdmin(BaseVersionAdmin): class FirmwareImageInline(TimeReadonlyAdminMixin, admin.StackedInline): model = FirmwareImage extra = 0 + readonly_fields = ["created", "modified", "extraction_status_display"] + + @admin.display(description=_("Extraction Status")) + def extraction_status_display(self, obj): + return _extraction_status_badge(obj.extraction_status) class Media: extra = "" if getattr(settings, "DEBUG", False) else ".min" @@ -108,6 +153,139 @@ def has_change_permission(self, request, obj=None): return True +@admin.register(FirmwareImage) +class FirmwareImageAdmin(BaseAdmin): + list_display = [ + "__str__", + "build", + "type", + "extraction_status_display", + "created", + "modified", + ] + list_filter = ["extraction_status", "build__category"] + search_fields = ["board", "target", "type"] + ordering = ["-created"] + readonly_fields = [ + "created", + "modified", + "extraction_status_display", + "failure_reason_display", + "extraction_log_display", + "source", + ] + fieldsets = [ + ( + None, + { + "fields": ["build", "file", "type", "created", "modified"], + }, + ), + ( + _("Extraction Status"), + { + "fields": [ + "extraction_status_display", + "failure_reason_display", + "extraction_log_display", + ], + "description": _( + "Metadata is extracted automatically on upload using fwtool " + "(primary) and DTB scan (fallback). " + "If both fail, fill in the Device Metadata fields below manually." + ), + }, + ), + ( + _("Device Metadata"), + { + "fields": [ + "board", + "compatible", + "target", + "fw_version", + "compat_version", + "source", + ], + }, + ), + ] + + def get_readonly_fields(self, request, obj=None): + readonly = list(self.readonly_fields) + if obj: + status = obj.extraction_status + if status in ( + FirmwareImage.STATUS_IN_PROGRESS, + FirmwareImage.STATUS_INVALID, + FirmwareImage.STATUS_MANUALLY_CONFIRMED, + ): + readonly += [ + "board", + "compatible", + "target", + "fw_version", + "compat_version", + ] + elif status == FirmwareImage.STATUS_SUCCESS: + if obj.source == "dtb": + readonly += ["board", "compatible", "compat_version"] + else: + readonly += [ + "board", + "compatible", + "target", + "fw_version", + "compat_version", + ] + return readonly + + @admin.display(description=_("Extraction Status")) + def extraction_status_display(self, obj): + return _extraction_status_badge(obj.extraction_status) + + @admin.display(description=_("Failure Reason")) + def failure_reason_display(self, obj): + if not obj.failure_reason: + return "-" + return _FAILURE_REASON_TEXT.get(obj.failure_reason, obj.failure_reason) + + @admin.display(description=_("Extraction Log")) + def extraction_log_display(self, obj): + if not obj.extraction_log: + return "-" + return format_html( + '
{}
', + obj.extraction_log, + ) + + def save_model(self, request, obj, form, change): + if change and "file" in form.changed_data: + obj.extraction_status = FirmwareImage.STATUS_UNCONFIRMED + obj.extraction_log = "" + obj.failure_reason = "" + obj.board = "" + obj.compatible = [] + obj.target = "" + obj.fw_version = "" + obj.source = "" + super().save_model(request, obj, form, change) + extract_firmware_metadata.delay(obj.pk) + return + if change: + if obj.extraction_status == FirmwareImage.STATUS_FAILED: + metadata_fields = ["board", "target", "fw_version"] + if any(f in form.changed_data for f in metadata_fields): + obj.extraction_status = FirmwareImage.STATUS_MANUALLY_CONFIRMED + obj.source = "manual" + elif ( + obj.extraction_status == FirmwareImage.STATUS_SUCCESS + and obj.source == "dtb" + ): + obj.extraction_status = FirmwareImage.STATUS_MANUALLY_CONFIRMED + super().save_model(request, obj, form, change) + + class BatchUpgradeConfirmationForm(forms.ModelForm): upgrade_options = forms.JSONField(widget=FirmwareSchemaWidget(), required=False) build = forms.ModelChoiceField( @@ -172,7 +350,14 @@ class Media: @admin.register(load_model("Build")) class BuildAdmin(BaseAdmin): - list_display = ["__str__", "organization", "category", "created", "modified"] + list_display = [ + "__str__", + "organization", + "category", + "build_status_display", + "created", + "modified", + ] list_filter = [CategoryOrganizationFilter, CategoryFilter] list_select_related = ["category", "category__organization"] search_fields = ["category__name", "version", "os"] @@ -190,6 +375,15 @@ def organization(self, obj): organization.short_description = _("organization") + @admin.display(description=_("Extraction status")) + def build_status_display(self, obj): + css, label = _BUILD_STATUS_CONFIG.get(obj.status, ("grey", obj.status)) + return format_html( + '{}', + css, + label, + ) + @admin.action( description=_("Mass-upgrade devices related to the selected build"), permissions=["change"], diff --git a/openwisp_firmware_upgrader/apps.py b/openwisp_firmware_upgrader/apps.py index ce76c7187..50b7f31e6 100644 --- a/openwisp_firmware_upgrader/apps.py +++ b/openwisp_firmware_upgrader/apps.py @@ -29,6 +29,7 @@ def ready(self, *args, **kwargs): self.connect_device_signals() self.connect_upgrade_signals() self.connect_delete_signals() + self.connect_metadata_signals() def register_menu_groups(self): register_menu_group( @@ -54,6 +55,12 @@ def register_menu_groups(self): "name": "changelist", "icon": "ow-mass-upgrade", }, + 4: { + "label": _("Firmware Images"), + "model": get_model_name(self.label, "FirmwareImage"), + "name": "changelist", + "icon": "ow-firmware", + }, }, "icon": "ow-firmware", }, @@ -116,5 +123,14 @@ def connect_delete_signals(self): dispatch_uid="organization.pre_delete.firmware_files", ) + def connect_metadata_signals(self): + FirmwareImage = load_model("firmware_upgrader", "FirmwareImage") + + post_save.connect( + FirmwareImage.trigger_metadata_extraction, + sender=FirmwareImage, + dispatch_uid="firmware_image.trigger_metadata_extraction", + ) + del ApiAppConfig diff --git a/openwisp_firmware_upgrader/base/models.py b/openwisp_firmware_upgrader/base/models.py index 8bfe6cab2..9053780d3 100644 --- a/openwisp_firmware_upgrader/base/models.py +++ b/openwisp_firmware_upgrader/base/models.py @@ -9,9 +9,11 @@ from django.core.validators import MaxValueValidator from django.db import models, transaction from django.db.models import Q +from django.urls import reverse from django.utils import timezone from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ +from openwisp_notifications.signals import notify from private_storage.fields import PrivateFileField from openwisp_controller.connection.exceptions import NoWorkingDeviceConnectionError @@ -38,6 +40,7 @@ batch_upgrade_operation, create_all_device_firmwares, create_device_firmware, + extract_firmware_metadata, upgrade_firmware, ) from ..utils import ( @@ -145,6 +148,28 @@ class AbstractBuild(TimeStampedEditableModel): ), ) + BUILD_STATUS_ANALYZING = "analyzing" + BUILD_STATUS_SUCCESS = "success" + BUILD_STATUS_FAILED = "failed" + BUILD_STATUS_INVALID = "invalid" + BUILD_STATUS_MANUALLY_CONFIRMED = "manually_confirmed" + + BUILD_STATUS_CHOICES = [ + (BUILD_STATUS_ANALYZING, _("Analyzing")), + (BUILD_STATUS_SUCCESS, _("Success")), + (BUILD_STATUS_FAILED, _("Failed")), + (BUILD_STATUS_INVALID, _("Invalid")), + (BUILD_STATUS_MANUALLY_CONFIRMED, _("Manually Confirmed")), + ] + + status = models.CharField( + _("extraction status"), + max_length=20, + choices=BUILD_STATUS_CHOICES, + default=BUILD_STATUS_ANALYZING, + db_index=True, + ) + class Meta: abstract = True verbose_name = _("Firmware Build") @@ -185,6 +210,20 @@ def batch_upgrade( self, firmwareless, upgrade_options=None, group=None, location=None ): upgrade_options = upgrade_options or {} + FirmwareImage = load_model("FirmwareImage") + unconfirmed = self.firmwareimage_set.exclude( + extraction_status__in=[ + FirmwareImage.STATUS_SUCCESS, + FirmwareImage.STATUS_MANUALLY_CONFIRMED, + ] + ) + if unconfirmed.exists(): + raise ValidationError( + _( + "All firmware images must have confirmed metadata " + "before a mass upgrade can be scheduled" + ) + ) # Check if there are any devices to upgrade with the given filters dry_run_result = load_model("BatchUpgradeOperation").dry_run( build=self, group=group, location=location @@ -256,6 +295,65 @@ def _find_firmwareless_devices(self, boards=None, group=None, location=None): qs = qs.filter(devicelocation__location=location) return qs.order_by("-created") + def _update_extraction_status(self): + FirmwareImage = load_model("FirmwareImage") + statuses = set( + FirmwareImage.objects.filter(build_id=self.pk).values_list( + "extraction_status", flat=True + ) + ) + if not statuses: + return + analyzing = {FirmwareImage.STATUS_UNCONFIRMED, FirmwareImage.STATUS_IN_PROGRESS} + if statuses & analyzing: + new_status = self.BUILD_STATUS_ANALYZING + elif FirmwareImage.STATUS_INVALID in statuses: + new_status = self.BUILD_STATUS_INVALID + elif FirmwareImage.STATUS_FAILED in statuses: + new_status = self.BUILD_STATUS_FAILED + elif FirmwareImage.STATUS_MANUALLY_CONFIRMED in statuses: + new_status = self.BUILD_STATUS_MANUALLY_CONFIRMED + else: + new_status = self.BUILD_STATUS_SUCCESS + if new_status == self.status: + return + old_status = self.status + load_model("Build").objects.filter(pk=self.pk).update(status=new_status) + self.status = new_status + if old_status == self.BUILD_STATUS_ANALYZING: + self._notify_extraction_complete(new_status) + + def _notify_extraction_complete(self, new_status): + level = ( + "info" + if new_status + in ( + self.BUILD_STATUS_SUCCESS, + self.BUILD_STATUS_MANUALLY_CONFIRMED, + ) + else "warning" + ) + status_display = dict(self.BUILD_STATUS_CHOICES)[new_status] + try: + admin_url = reverse( + "admin:firmware_upgrader_build_change", + args=[str(self.pk)], + ) + notify.send( + sender=self, + type="generic_message", + level=level, + url=admin_url, + target=self, + message=( + f"Metadata extraction for build " + f'"{self}" ' + f"completed with status: {status_display}." + ), + ) + except Exception: + logger.exception("Failed to send build extraction completion notification") + def get_build_directory(instance, filename): build_pk = str(instance.build.pk) @@ -282,6 +380,56 @@ class AbstractFirmwareImage(TimeStampedEditableModel): ), ) + STATUS_UNCONFIRMED = "unconfirmed" + STATUS_IN_PROGRESS = "in_progress" + STATUS_SUCCESS = "success" + STATUS_FAILED = "failed" + STATUS_MANUALLY_CONFIRMED = "manually_confirmed" + STATUS_INVALID = "invalid" + LOCKED_STATUSES = (STATUS_SUCCESS, STATUS_MANUALLY_CONFIRMED) + + FAILURE_UNSUPPORTED = "unsupported_format" + FAILURE_OOM = "out_of_memory" + FAILURE_INVALID = "invalid_file" + FAILURE_TIMEOUT = "timeout" + + EXTRACTION_STATUS_CHOICES = [ + (STATUS_UNCONFIRMED, _("Unconfirmed")), + (STATUS_IN_PROGRESS, _("In Progress")), + (STATUS_SUCCESS, _("Success")), + (STATUS_FAILED, _("Failed")), + (STATUS_MANUALLY_CONFIRMED, _("Manually Confirmed")), + (STATUS_INVALID, _("Invalid")), + ] + + FAILURE_REASON_CHOICES = [ + (FAILURE_UNSUPPORTED, _("Unsupported format")), + (FAILURE_OOM, _("Out of memory")), + (FAILURE_INVALID, _("Invalid file")), + (FAILURE_TIMEOUT, _("Extraction timed out")), + ] + + extraction_status = models.CharField( + _("extraction status"), + max_length=20, + choices=EXTRACTION_STATUS_CHOICES, + default=STATUS_UNCONFIRMED, + db_index=True, + ) + extraction_log = models.TextField(blank=True) + failure_reason = models.CharField( + max_length=20, + choices=FAILURE_REASON_CHOICES, + blank=True, + default="", + ) + board = models.CharField(max_length=200, blank=True) + compatible = models.JSONField(default=list, blank=True) + target = models.CharField(max_length=100, blank=True) + fw_version = models.CharField(_("firmware version"), max_length=50, blank=True) + compat_version = models.CharField(max_length=10, blank=True) + source = models.CharField(max_length=20, blank=True) + class Meta: abstract = True verbose_name = _("Firmware Image") @@ -299,12 +447,9 @@ def boards(self): def clean(self): self._clean_type() + self._validate_locked() self._validate_file_header() self._validate_rootfs() - try: - self.boards - except KeyError: - raise ValidationError({"type": "Could not find boards for this type"}) def delete(self, *args, **kwargs): super().delete(*args, **kwargs) @@ -342,15 +487,52 @@ def _remove_file(cls, file_path): return True def _clean_type(self): - """ - auto determine type if missing - """ if self.type: return filename = self.file.name - # removes leading prefix self.type = "-".join(filename.split("-")[1:]) + @classmethod + def trigger_metadata_extraction(cls, instance, created, **kwargs): + if not created: + return + Build = load_model("Build") + Build.objects.filter(pk=instance.build_id).update( + status=Build.BUILD_STATUS_ANALYZING + ) + transaction.on_commit(lambda: extract_firmware_metadata.delay(str(instance.pk))) + + def _validate_locked(self): + if not self.pk or self.extraction_status not in self.LOCKED_STATUSES: + return + original = ( + self.__class__.objects.filter(pk=self.pk) + .values( + "board", + "compatible", + "target", + "fw_version", + "compat_version", + "source", + ) + .first() + ) + if not original: + return + for field in ( + "board", + "compatible", + "target", + "fw_version", + "compat_version", + "source", + ): + original_val = original[field] + if original_val and getattr(self, field) != original_val: + raise ValidationError( + _("Metadata fields are read-only after confirmation.") + ) + def _validate_file_header(self): if not self.file: return @@ -449,6 +631,16 @@ class Meta: def clean(self): if not hasattr(self, "image") or not hasattr(self, "device"): return + if self.image.extraction_status not in self.image.LOCKED_STATUSES: + raise ValidationError( + { + "image": _( + "This firmware image's metadata has not been confirmed yet. " + "Metadata extraction must complete successfully " + "before it can be used for upgrades." + ) + } + ) if ( self.image.build.category.organization is not None and self.image.build.category.organization != self.device.organization @@ -567,9 +759,15 @@ def auto_add_device_firmware_to_device(cls, instance, created, **kwargs): @classmethod def auto_create_device_firmwares(cls, instance, created, **kwargs): if created: - transaction.on_commit( - partial(create_all_device_firmwares.delay, instance.pk) - ) + return + if instance.extraction_status not in ( + instance.STATUS_SUCCESS, + instance.STATUS_MANUALLY_CONFIRMED, + ): + return + transaction.on_commit( + partial(create_all_device_firmwares.delay, str(instance.pk)) + ) @classmethod def get_image_queryset_for_device(cls, device, device_firmware=None): diff --git a/openwisp_firmware_upgrader/migrations/0018_build_status_firmwareimage_board_and_more.py b/openwisp_firmware_upgrader/migrations/0018_build_status_firmwareimage_board_and_more.py new file mode 100644 index 000000000..9a13e189d --- /dev/null +++ b/openwisp_firmware_upgrader/migrations/0018_build_status_firmwareimage_board_and_more.py @@ -0,0 +1,100 @@ +# Generated by Django 5.2.13 on 2026-05-22 16:49 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("firmware_upgrader", "0017_alter_batchupgradeoperation_status"), + ] + + operations = [ + migrations.AddField( + model_name="build", + name="status", + field=models.CharField( + choices=[ + ("analyzing", "Analyzing"), + ("success", "Success"), + ("failed", "Failed"), + ("invalid", "Invalid"), + ("manually_confirmed", "Manually Confirmed"), + ], + db_index=True, + default="analyzing", + max_length=20, + verbose_name="extraction status", + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="board", + field=models.CharField(blank=True, max_length=200), + ), + migrations.AddField( + model_name="firmwareimage", + name="compat_version", + field=models.CharField(blank=True, max_length=10), + ), + migrations.AddField( + model_name="firmwareimage", + name="compatible", + field=models.JSONField(blank=True, default=list), + ), + migrations.AddField( + model_name="firmwareimage", + name="extraction_log", + field=models.TextField(blank=True), + ), + migrations.AddField( + model_name="firmwareimage", + name="extraction_status", + field=models.CharField( + choices=[ + ("unconfirmed", "Unconfirmed"), + ("in_progress", "In Progress"), + ("success", "Success"), + ("failed", "Failed"), + ("manually_confirmed", "Manually Confirmed"), + ("invalid", "Invalid"), + ], + db_index=True, + default="unconfirmed", + max_length=20, + verbose_name="extraction status", + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="failure_reason", + field=models.CharField( + blank=True, + choices=[ + ("unsupported_format", "Unsupported format"), + ("out_of_memory", "Out of memory"), + ("invalid_file", "Invalid file"), + ("timeout", "Extraction timed out"), + ], + default="", + max_length=20, + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="fw_version", + field=models.CharField( + blank=True, max_length=50, verbose_name="firmware version" + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="source", + field=models.CharField(blank=True, max_length=20), + ), + migrations.AddField( + model_name="firmwareimage", + name="target", + field=models.CharField(blank=True, max_length=100), + ), + ] diff --git a/openwisp_firmware_upgrader/tasks.py b/openwisp_firmware_upgrader/tasks.py index 38ad83bdd..a11b7bf35 100644 --- a/openwisp_firmware_upgrader/tasks.py +++ b/openwisp_firmware_upgrader/tasks.py @@ -4,12 +4,16 @@ from celery import shared_task from celery.exceptions import SoftTimeLimitExceeded from django.core.exceptions import ObjectDoesNotExist +from django.urls import reverse from django.utils.translation import gettext_lazy as _ +from openwisp_notifications.signals import notify from openwisp_utils.tasks import OpenwispCeleryTask from . import settings as app_settings from .exceptions import RecoverableFailure +from .extractors.exceptions import DecompressionLimitExceeded, UnsupportedImageError +from .extractors.openwrt import OpenWrtMetadataExtractor from .swapper import load_model logger = logging.getLogger(__name__) @@ -97,3 +101,151 @@ def delete_firmware_files(files_to_delete): FirmwareImage = load_model("FirmwareImage") for file_path in files_to_delete: FirmwareImage._remove_file(file_path) + + +def _compat_blocks_pairing(compat_version): + try: + major, minor = (int(x) for x in str(compat_version).split(".")) + return (major, minor) > (1, 0) + except (ValueError, AttributeError, TypeError): + return False + + +@shared_task(bind=True, soft_time_limit=app_settings.TASK_TIMEOUT) +def extract_firmware_metadata(self, image_pk): + FirmwareImage = load_model("FirmwareImage") + + try: + image = FirmwareImage.objects.get(pk=image_pk) + except FirmwareImage.DoesNotExist: + logger.warning( + "extract_firmware_metadata: FirmwareImage pk=%s not found, skipping", + image_pk, + ) + return + + updated = FirmwareImage.objects.filter( + pk=image_pk, + extraction_status=FirmwareImage.STATUS_UNCONFIRMED, + ).update(extraction_status=FirmwareImage.STATUS_IN_PROGRESS) + if not updated: + return + log_lines = [f"[+] Analyzing: {image.file.name}"] + update = {} + + try: + extractor_class = getattr( + image.build.category.__class__, + "metadata_extractor_class", + OpenWrtMetadataExtractor, + ) + meta = extractor_class(image.file.path).extract() + log_lines.append("[+] extraction: success") + update = { + "extraction_status": FirmwareImage.STATUS_SUCCESS, + "extraction_log": "\n".join(log_lines), + "board": meta.get("model", ""), + "compatible": meta.get("compatible", []), + "target": meta.get("target", ""), + "fw_version": meta.get("version", ""), + "compat_version": meta.get("compat_version", ""), + "source": meta.get("source", "fwtool"), + } + + except SoftTimeLimitExceeded: + log_lines.append(f"[!] Task timed out after {app_settings.TASK_TIMEOUT}s.") + update = { + "extraction_status": FirmwareImage.STATUS_FAILED, + "failure_reason": FirmwareImage.FAILURE_TIMEOUT, + "extraction_log": "\n".join(log_lines), + } + logger.warning( + "extract_firmware_metadata: soft time limit exceeded for pk=%s", + image_pk, + ) + + except DecompressionLimitExceeded as exc: + log_lines.append(f"[!] {exc}") + update = { + "extraction_status": FirmwareImage.STATUS_FAILED, + "failure_reason": FirmwareImage.FAILURE_OOM, + "extraction_log": "\n".join(log_lines), + } + logger.warning( + "extract_firmware_metadata: decompression limit exceeded for pk=%s - %s", + image_pk, + exc, + ) + + except UnsupportedImageError as exc: + log_lines.append(f"[-] fwtool: {exc}") + log_lines.append("[!] Extraction failed. Manual input required.") + update = { + "extraction_status": FirmwareImage.STATUS_FAILED, + "failure_reason": FirmwareImage.FAILURE_UNSUPPORTED, + "extraction_log": "\n".join(log_lines), + } + logger.warning( + "extract_firmware_metadata: unsupported image pk=%s - %s", + image_pk, + exc, + ) + + except Exception: + log_lines.append("[!] Unexpected error during extraction.") + update = { + "extraction_status": FirmwareImage.STATUS_INVALID, + "failure_reason": FirmwareImage.FAILURE_INVALID, + "extraction_log": "\n".join(log_lines), + } + logger.exception( + "extract_firmware_metadata: unhandled exception for pk=%s", + image_pk, + ) + + FirmwareImage.objects.filter(pk=image_pk).update(**update) + + if update.get("extraction_status") not in ( + FirmwareImage.STATUS_SUCCESS, + FirmwareImage.STATUS_IN_PROGRESS, + ): + try: + image = FirmwareImage.objects.select_related( + "build", "build__category" + ).get(pk=image_pk) + admin_url = reverse( + "admin:firmware_upgrader_build_change", args=[str(image.build_id)] + ) + notify.send( + sender=image, + type="generic_message", + level="error", + url=admin_url, + target=image.build, + message=( + f'Metadata extraction failed for {image}: ' + f'{update.get("failure_reason", "unknown error")}. ' + f"You can manually enter metadata or re-upload the image." + ), + ) + except Exception: + logger.exception("Failed to send extraction failure notification") + + try: + fresh = FirmwareImage.objects.select_related("build").get(pk=image_pk) + fresh.build._update_extraction_status() + except Exception: + logger.exception( + "Failed to update build extraction status for image %s", image_pk + ) + + if update.get("extraction_status") == FirmwareImage.STATUS_SUCCESS: + compat = update.get("compat_version", "") + if _compat_blocks_pairing(compat): + logger.info( + "Auto-pairing skipped for image %s: compat_version %s > 1.0", + image_pk, + compat, + ) + else: + create_all_device_firmwares.delay(str(image_pk)) diff --git a/openwisp_firmware_upgrader/tests/base.py b/openwisp_firmware_upgrader/tests/base.py index 79614f3b2..fe32466bb 100644 --- a/openwisp_firmware_upgrader/tests/base.py +++ b/openwisp_firmware_upgrader/tests/base.py @@ -77,6 +77,8 @@ def _create_build(self, **kwargs): def _create_firmware_image(self, **kwargs): opts = dict(type=self.TPLINK_4300_IMAGE) opts.update(kwargs) + if "extraction_status" not in opts: + opts["extraction_status"] = FirmwareImage.STATUS_SUCCESS category_opts = {} if "organization" in opts: category_opts["organization"] = opts.pop("organization") diff --git a/openwisp_firmware_upgrader/tests/test_admin.py b/openwisp_firmware_upgrader/tests/test_admin.py index 344218a01..5e546b947 100644 --- a/openwisp_firmware_upgrader/tests/test_admin.py +++ b/openwisp_firmware_upgrader/tests/test_admin.py @@ -21,6 +21,7 @@ DeviceFirmwareForm, DeviceFirmwareInline, DeviceUpgradeOperationInline, + FirmwareImageAdmin, FirmwareImageInline, admin, ) @@ -548,8 +549,7 @@ def test_deactivated_firmware_image_inline(self): # is displayed as readonly in the admin interface. self.assertContains( response, - '
Test Category v0.1:' - " TP-Link WDR4300 v1 (OpenWrt 19.07 and later)
", + "Test Category v0.1: TP-Link WDR4300 v1 (OpenWrt 19.07 and later)", ) self.assertNotContains( response, @@ -1069,6 +1069,143 @@ def test_device_upgrade_operation_inline_delete_mixed_statuses(self): self.assertIn("disabled", in_progress_delete_input) self.assertLess(failed_index, in_progress_index) + def test_firmware_image_readonly_fields_in_progress(self): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_IN_PROGRESS + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + readonly = fw_admin.get_readonly_fields(request, obj=fw) + for field in ["board", "compatible", "target", "fw_version", "compat_version"]: + with self.subTest(field=field): + self.assertIn(field, readonly) + + def test_firmware_image_readonly_fields_success_fwtool(self): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_SUCCESS + fw.source = "fwtool" + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + readonly = fw_admin.get_readonly_fields(request, obj=fw) + for field in ["board", "compatible", "target", "fw_version", "compat_version"]: + with self.subTest(field=field): + self.assertIn(field, readonly) + + def test_firmware_image_readonly_fields_success_dtb(self): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_SUCCESS + fw.source = "dtb" + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + readonly = fw_admin.get_readonly_fields(request, obj=fw) + for field in ["board", "compatible", "compat_version"]: + with self.subTest(field=field): + self.assertIn(field, readonly) + for field in ["target", "fw_version"]: + with self.subTest(field=field): + self.assertNotIn(field, readonly) + + def test_firmware_image_readonly_fields_failed(self): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_FAILED + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + readonly = fw_admin.get_readonly_fields(request, obj=fw) + for field in ["board", "compatible", "target", "fw_version", "compat_version"]: + with self.subTest(field=field): + self.assertNotIn(field, readonly) + + def test_firmware_image_readonly_fields_manually_confirmed(self): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_MANUALLY_CONFIRMED + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + readonly = fw_admin.get_readonly_fields(request, obj=fw) + for field in ["board", "compatible", "target", "fw_version", "compat_version"]: + with self.subTest(field=field): + self.assertIn(field, readonly) + + def test_firmware_image_readonly_fields_invalid(self): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_INVALID + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + readonly = fw_admin.get_readonly_fields(request, obj=fw) + for field in ["board", "compatible", "target", "fw_version", "compat_version"]: + with self.subTest(field=field): + self.assertIn(field, readonly) + + @mock.patch("openwisp_firmware_upgrader.admin.extract_firmware_metadata") + def test_firmware_image_save_model_file_change_triggers_extraction(self, mock_task): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_SUCCESS + fw.board = "Old Board" + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + form = mock.MagicMock() + form.changed_data = ["file"] + fw_admin.save_model(request, fw, form, change=True) + fw.refresh_from_db() + self.assertEqual(fw.extraction_status, FirmwareImage.STATUS_UNCONFIRMED) + self.assertEqual(fw.board, "") + mock_task.delay.assert_called_once_with(fw.pk) + + @mock.patch("openwisp_firmware_upgrader.admin.extract_firmware_metadata") + def test_firmware_image_save_model_failed_to_manually_confirmed(self, mock_task): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_FAILED + fw.board = "Generic x86" + fw.target = "x86/64" + fw.fw_version = "23.05.5" + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + form = mock.MagicMock() + form.changed_data = ["board", "target"] + fw_admin.save_model(request, fw, form, change=True) + fw.refresh_from_db() + self.assertEqual(fw.extraction_status, FirmwareImage.STATUS_MANUALLY_CONFIRMED) + self.assertEqual(fw.source, "manual") + mock_task.delay.assert_not_called() + + @mock.patch("openwisp_firmware_upgrader.admin.extract_firmware_metadata") + def test_firmware_image_save_model_dtb_success_to_manually_confirmed( + self, mock_task + ): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_SUCCESS + fw.source = "dtb" + fw.board = "Xunlong Orange Pi Zero" + fw.target = "" + fw.fw_version = "" + fw.save() + fw.target = "sunxi/cortexa7" + fw.fw_version = "23.05.5" + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + form = mock.MagicMock() + form.changed_data = ["target", "fw_version"] + fw_admin.save_model(request, fw, form, change=True) + fw.refresh_from_db() + self.assertEqual(fw.extraction_status, FirmwareImage.STATUS_MANUALLY_CONFIRMED) + self.assertEqual(fw.source, "dtb") + mock_task.delay.assert_not_called() + class TestAdminTransaction( BaseTestAdmin, AdminActionPermTestMixin, TransactionTestCase diff --git a/openwisp_firmware_upgrader/tests/test_api.py b/openwisp_firmware_upgrader/tests/test_api.py index c0a8724c9..cdd9023a4 100644 --- a/openwisp_firmware_upgrader/tests/test_api.py +++ b/openwisp_firmware_upgrader/tests/test_api.py @@ -326,7 +326,7 @@ def test_api_batch_upgrade(self): self.assertEqual(BatchUpgradeOperation.objects.count(), 0) with self.subTest("Existing build"): url = reverse("upgrader:api_build_batch_upgrade", args=[build.pk]) - with self.assertNumQueries(10): + with self.assertNumQueries(11): r = self.client.post(url) self.assertEqual(BatchUpgradeOperation.objects.count(), 1) batch = BatchUpgradeOperation.objects.first() @@ -416,7 +416,7 @@ def test_api_shared_build_batch_upgrade(self): with self.subTest( "Test superuser can mass upgrade shared build with upgrade_all" ): - with self.assertNumQueries(8): + with self.assertNumQueries(9): response = self.client.post(path, {"upgrade_all": True}) self.assertEqual(response.status_code, 201) batch = BatchUpgradeOperation.objects.first() @@ -1159,7 +1159,7 @@ def test_firmware_create(self): "file": self._get_simpleuploadedfile(self.FAKE_IMAGE_PATH2), "type": self.TPLINK_4300_IMAGE, } - with self.assertNumQueries(9): + with self.assertNumQueries(10): r = self.client.post(url, data) self.assertEqual(r.status_code, 201) self.assertEqual(FirmwareImage.objects.count(), 1) diff --git a/openwisp_firmware_upgrader/tests/test_extractors.py b/openwisp_firmware_upgrader/tests/test_extractors.py index e462c528e..081ce5a4a 100644 --- a/openwisp_firmware_upgrader/tests/test_extractors.py +++ b/openwisp_firmware_upgrader/tests/test_extractors.py @@ -8,6 +8,8 @@ ExtractionError, UnsupportedImageError, ) +from ..extractors.openwrt import OpenWrtMetadataExtractor +from ..upgraders.openwrt import OpenWrt class ConcreteSuccessExtractor(BaseMetadataExtractor): @@ -85,3 +87,6 @@ def test_image_path_stored_as_string(self): extractor = ConcreteSuccessExtractor(Path("/fake/path.bin")) self.assertIsInstance(extractor.image_path, str) self.assertEqual(extractor.image_path, "/fake/path.bin") + + def test_metadata_extractor_class_is_openwrt_extractor(self): + self.assertIs(OpenWrt.metadata_extractor_class, OpenWrtMetadataExtractor) diff --git a/openwisp_firmware_upgrader/tests/test_models.py b/openwisp_firmware_upgrader/tests/test_models.py index c421c58a4..caa99587d 100644 --- a/openwisp_firmware_upgrader/tests/test_models.py +++ b/openwisp_firmware_upgrader/tests/test_models.py @@ -14,7 +14,6 @@ from openwisp_utils.tests import capture_any_output from .. import settings as app_settings -from ..hardware import FIRMWARE_IMAGE_MAP, REVERSE_FIRMWARE_IMAGE_MAP from ..swapper import load_model from ..tasks import upgrade_firmware from .base import TestUpgraderMixin @@ -36,7 +35,7 @@ class TestModels(TestUpgraderMixin, TestCase): app_label = "openwisp_firmware_upgrader" os = "OpenWrt 19.07-SNAPSHOT r11061-6ffd4d8a4d" - image_type = REVERSE_FIRMWARE_IMAGE_MAP["YunCore XD3200"] + image_type = "ar71xx-generic-xd3200-squashfs-sysupgrade.bin" def test_category_str(self): c = Category(name="WiFi Hotspot") @@ -195,22 +194,6 @@ def test_device_fw_uninstalled_without_credentials_rejected(self): device_fw.full_clean() self.assertIn("related connection", str(ctx.exception)) - def test_invalid_board(self): - image = FIRMWARE_IMAGE_MAP[self.TPLINK_4300_IMAGE] - boards = image["boards"] - del image["boards"] - err = None - try: - self._create_firmware_image() - except ValidationError as e: - err = e - image["boards"] = boards - if err: - self.assertIn("type", err.message_dict) - self.assertIn("not find boards", str(err)) - else: - self.fail("ValidationError not raised") - def test_custom_image_type_present(self): t = FirmwareImage._meta.get_field("type") custom_images = app_settings.CUSTOM_OPENWRT_IMAGES @@ -584,6 +567,168 @@ def test_upgrade_operation_str(self): expected = f"{uo.device} ({timezone.localtime(uo.created).strftime('%Y-%m-%d %H:%M:%S')})" self.assertEqual(str(uo), expected) + def test_firmware_image_rejects_invalid_file_headers(self): + build = self._get_build() + invalid_headers = [ + (b"\xff\xd8\xff" + b"\x00" * 20, "JPEG"), + (b"%PDF" + b"\x00" * 20, "PDF"), + (b"\x89PNG\r\n\x1a\n" + b"\x00" * 20, "PNG"), + (b"PK\x03\x04" + b"\x00" * 20, "ZIP"), + (b"\x7fELF" + b"\x00" * 20, "ELF"), + ] + for content, label in invalid_headers: + with self.subTest(file_type=label): + fw = FirmwareImage( + build=build, + type=self.TPLINK_4300_IMAGE, + file=SimpleUploadedFile( + name=f"openwrt-{self.TPLINK_4300_IMAGE}", + content=content, + content_type="application/octet-stream", + ), + ) + try: + fw.full_clean() + except ValidationError as e: + self.assertIn("file", e.message_dict) + else: + self.fail(f"ValidationError not raised for {label} file") + + def test_firmware_image_rejects_rootfs_image(self): + build = self._get_build() + fw = FirmwareImage( + build=build, + type=self.TPLINK_4300_IMAGE, + file=SimpleUploadedFile( + name="ath79-generic-tplink_tl-wdr4300-v1-squashfs-rootfs.img", + content=b"\x00" * 100, + content_type="application/octet-stream", + ), + ) + try: + fw.full_clean() + except ValidationError as e: + self.assertIn("file", e.message_dict) + else: + self.fail("ValidationError not raised for rootfs image") + + def test_batch_upgrade_blocked_with_unconfirmed_images(self): + env = self._create_upgrade_env() + build = env["build2"] + FirmwareImage.objects.filter(build=build).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + with self.assertRaises(ValidationError) as ctx: + build.batch_upgrade(firmwareless=False) + self.assertIn("confirmed metadata", str(ctx.exception)) + + def test_batch_upgrade_allowed_when_all_images_confirmed(self): + env = self._create_upgrade_env() + build = env["build2"] + build.firmwareimage_set.update(extraction_status=FirmwareImage.STATUS_SUCCESS) + batch = build.batch_upgrade(firmwareless=False) + self.assertIsNotNone(batch) + + def test_update_extraction_status_all_success(self): + build = self._create_build() + build.status = Build.BUILD_STATUS_ANALYZING + build.save() + image = self._create_firmware_image(build=build) + image.extraction_status = FirmwareImage.STATUS_SUCCESS + image.save() + build._update_extraction_status() + build.refresh_from_db() + self.assertEqual(build.status, Build.BUILD_STATUS_SUCCESS) + + def test_update_extraction_status_with_failed(self): + build = self._create_build() + build.status = Build.BUILD_STATUS_ANALYZING + build.save() + image = self._create_firmware_image(build=build) + image.extraction_status = FirmwareImage.STATUS_FAILED + image.save() + build._update_extraction_status() + build.refresh_from_db() + self.assertEqual(build.status, Build.BUILD_STATUS_FAILED) + + def test_update_extraction_status_analyzing_takes_priority(self): + build = self._create_build() + build.status = Build.BUILD_STATUS_ANALYZING + build.save() + image1 = self._create_firmware_image(build=build) + image1.extraction_status = FirmwareImage.STATUS_FAILED + image1.save() + image2 = self._create_firmware_image( + build=build, type=self.TPLINK_4300_IL_IMAGE + ) + image2.extraction_status = FirmwareImage.STATUS_IN_PROGRESS + image2.save() + build._update_extraction_status() + build.refresh_from_db() + self.assertEqual(build.status, Build.BUILD_STATUS_ANALYZING) + + def test_visible_locked_blocks_field_change_on_success(self): + image = self._create_firmware_image() + image.extraction_status = FirmwareImage.STATUS_SUCCESS + image.board = "TP-Link WDR4300" + image.target = "ath79/generic" + image.source = "fwtool" + image.save() + image.board = "Changed Board" + with self.assertRaises(ValidationError) as ctx: + image._validate_locked() + self.assertIn("read-only", str(ctx.exception)) + + def test_validate_locked_allows_change_when_failed(self): + image = self._create_firmware_image() + image.extraction_status = FirmwareImage.STATUS_FAILED + image.board = "" + image.save() + image.board = "Manually entered" + image._validate_locked() + + def test_validate_locked_allows_filling_empty_dtb_fields(self): + image = self._create_firmware_image() + image.extraction_status = FirmwareImage.STATUS_SUCCESS + image.board = "Orange Pi Zero" + image.target = "" + image.source = "dtb" + image.save() + image.target = "sunxi/cortexa7" + image._validate_locked() + + @capture_any_output() + def test_device_firmware_clean_blocks_unconfirmed_image(self): + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + image.refresh_from_db() + device = self._create_device(organization=image.build.category.organization) + self._create_config(device=device) + device_fw = DeviceFirmware() + device_fw.image = image + device_fw.device = device + with self.assertRaises(ValidationError) as ctx: + device_fw.clean() + self.assertIn("image", ctx.exception.message_dict) + + def test_auto_create_device_firmwares_skip_unconfirmed(self): + image = self._create_firmware_image() + image.extraction_status = FirmwareImage.STATUS_UNCONFIRMED + image.save() + with mock.patch("django.db.transaction.on_commit") as mock_on_commit: + DeviceFirmware.auto_create_device_firmwares(instance=image, created=False) + mock_on_commit.assert_not_called() + + def test_auto_create_device_firmwares_triggers_on_confirmed(self): + image = self._create_firmware_image() + image.extraction_status = FirmwareImage.STATUS_SUCCESS + image.save() + with mock.patch("django.db.transaction.on_commit") as mock_on_commit: + DeviceFirmware.auto_create_device_firmwares(instance=image, created=False) + mock_on_commit.assert_called_once() + class TestModelsTransaction(TestUpgraderMixin, TransactionTestCase): _mock_updrade = "openwisp_firmware_upgrader.upgraders.openwrt.OpenWrt.upgrade" diff --git a/openwisp_firmware_upgrader/tests/test_tasks.py b/openwisp_firmware_upgrader/tests/test_tasks.py index 4d98d7cd2..4690944b9 100644 --- a/openwisp_firmware_upgrader/tests/test_tasks.py +++ b/openwisp_firmware_upgrader/tests/test_tasks.py @@ -1,3 +1,4 @@ +import uuid from unittest import mock from celery.exceptions import SoftTimeLimitExceeded @@ -6,12 +7,16 @@ from openwisp_utils.tests import capture_any_output from .. import tasks +from ..extractors.exceptions import DecompressionLimitExceeded, UnsupportedImageError from ..swapper import load_model from .base import TestUpgraderMixin BatchUpgradeOperation = load_model("BatchUpgradeOperation") +FirmwareImage = load_model("FirmwareImage") UpgradeOperation = load_model("UpgradeOperation") +_MOCK_EXTRACTOR = "openwisp_firmware_upgrader.tasks.OpenWrtMetadataExtractor" + class TestTasks(TestUpgraderMixin, TransactionTestCase): _mock_upgrade = "openwisp_firmware_upgrader.upgraders.openwrt.OpenWrt.upgrade" @@ -66,3 +71,194 @@ def test_batch_upgrade_operation_resilience(self, mocked_logger, *args): mocked_logger.assert_called_with( f"The BatchUpgradeOperation object with id {batch_id} has been deleted" ) + batch_id = BatchUpgradeOperation().id + tasks.batch_upgrade_operation.run(batch_id=batch_id, firmwareless=False) + mocked_logger.assert_called_with( + f"The BatchUpgradeOperation object with id {batch_id} has been deleted" + ) + + @mock.patch(_MOCK_EXTRACTOR) + @capture_any_output() + def test_extract_firmware_metadata_success(self, *args): + MockExtractor = args[0] + MockExtractor.return_value.extract.return_value = { + "model": "TP-Link WDR4300", + "compatible": ["tplink,tl-wdr4300-v1"], + "target": "ath79/generic", + "version": "23.05.5", + "compat_version": "1.0", + "source": "fwtool", + } + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_SUCCESS) + self.assertEqual(image.board, "TP-Link WDR4300") + self.assertEqual(image.target, "ath79/generic") + self.assertEqual(image.source, "fwtool") + self.assertIn("success", image.extraction_log) + self.assertEqual(image.fw_version, "23.05.5") + self.assertEqual(image.compat_version, "1.0") + self.assertEqual(image.compatible, ["tplink,tl-wdr4300-v1"]) + + @mock.patch(_MOCK_EXTRACTOR) + @capture_any_output() + def test_extract_firmware_metadata_dtb_fallback(self, *args): + MockExtractor = args[0] + MockExtractor.return_value.extract.return_value = { + "model": "Xunlong Orange Pi Zero", + "compatible": ["xunlong,orangepi-zero"], + "target": "", + "version": "", + "compat_version": "1.0", + "source": "dtb", + } + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_SUCCESS) + self.assertEqual(image.source, "dtb") + self.assertEqual(image.board, "Xunlong Orange Pi Zero") + self.assertEqual(image.target, "") + self.assertEqual(image.compatible, ["xunlong,orangepi-zero"]) + self.assertEqual(image.compat_version, "1.0") + + @mock.patch(_MOCK_EXTRACTOR) + @capture_any_output() + def test_extract_firmware_metadata_unsupported_error(self, *args): + MockExtractor = args[0] + MockExtractor.return_value.extract.side_effect = UnsupportedImageError( + "armsr image type not supported" + ) + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_FAILED) + self.assertEqual(image.failure_reason, FirmwareImage.FAILURE_UNSUPPORTED) + self.assertIn("Extraction failed", image.extraction_log) + + @mock.patch(_MOCK_EXTRACTOR) + @capture_any_output() + def test_extract_firmware_metadata_decompression_limit(self, *args): + MockExtractor = args[0] + MockExtractor.return_value.extract.side_effect = DecompressionLimitExceeded( + "exceeded max decompressed size" + ) + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_FAILED) + self.assertEqual(image.failure_reason, FirmwareImage.FAILURE_OOM) + + @mock.patch(_MOCK_EXTRACTOR) + @capture_any_output() + def test_extract_firmware_metadata_timeout(self, *args): + MockExtractor = args[0] + MockExtractor.return_value.extract.side_effect = SoftTimeLimitExceeded() + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_FAILED) + self.assertEqual(image.failure_reason, FirmwareImage.FAILURE_TIMEOUT) + + @mock.patch(_MOCK_EXTRACTOR) + @capture_any_output() + def test_extract_firmware_metadata_invalid_exception(self, *args): + MockExtractor = args[0] + MockExtractor.return_value.extract.side_effect = RuntimeError("unexpected") + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_INVALID) + self.assertEqual(image.failure_reason, FirmwareImage.FAILURE_INVALID) + + @mock.patch("logging.Logger.warning") + def test_extract_firmware_metadata_image_not_found(self, mock_warning): + fake_pk = str(uuid.uuid4()) + tasks.extract_firmware_metadata.run(fake_pk) + mock_warning.assert_called_once() + self.assertIn(fake_pk, mock_warning.call_args.args) + + @mock.patch(_MOCK_EXTRACTOR) + def test_extract_firmware_metadata_skips_non_unconfirmed(self, MockExtractor): + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_IN_PROGRESS + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + MockExtractor.assert_not_called() + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_IN_PROGRESS) + + def test_compat_blocks_pairing_above_1_0(self): + self.assertTrue(tasks._compat_blocks_pairing("1.1")) + self.assertTrue(tasks._compat_blocks_pairing("2.0")) + + def test_compat_blocks_pairing_at_or_below_1_0(self): + self.assertFalse(tasks._compat_blocks_pairing("1.0")) + self.assertFalse(tasks._compat_blocks_pairing("0.9")) + + def test_compat_blocks_pairing_invalid_values(self): + self.assertFalse(tasks._compat_blocks_pairing("")) + self.assertFalse(tasks._compat_blocks_pairing(None)) + self.assertFalse(tasks._compat_blocks_pairing("bad")) + + @mock.patch(_MOCK_EXTRACTOR) + @mock.patch("openwisp_firmware_upgrader.tasks.create_all_device_firmwares") + @capture_any_output() + def test_extract_firmware_metadata_skips_pairing_for_high_compat( + self, mock_create_firmwares, MockExtractor + ): + MockExtractor.return_value.extract.return_value = { + "model": "Test Device", + "compatible": ["test,device"], + "target": "test/target", + "version": "23.05.5", + "compat_version": "2.0", + "source": "fwtool", + } + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + mock_create_firmwares.delay.assert_not_called() + + @mock.patch(_MOCK_EXTRACTOR) + @mock.patch("openwisp_firmware_upgrader.tasks.create_all_device_firmwares") + @capture_any_output() + def test_extract_firmware_metadata_triggers_pairing_for_low_compat( + self, mock_create_firmwares, MockExtractor + ): + MockExtractor.return_value.extract.return_value = { + "model": "Test Device", + "compatible": ["test,device"], + "target": "test/target", + "version": "23.05.5", + "compat_version": "1.0", + "source": "fwtool", + } + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + mock_create_firmwares.delay.assert_called_once_with(str(image.pk)) diff --git a/tests/openwisp2/sample_connection/api/views.py b/tests/openwisp2/sample_connection/api/views.py index fd2207cbc..e8d868e3e 100644 --- a/tests/openwisp2/sample_connection/api/views.py +++ b/tests/openwisp2/sample_connection/api/views.py @@ -14,7 +14,7 @@ DeviceConnectionDetailView as BaseDeviceConnectionDetailView, ) from openwisp_controller.connection.api.views import ( - DeviceConnectionListCreateView as BaseDeviceConnectionListCreateView, + DeviceConnenctionListCreateView as BaseDeviceConnectionListCreateView, ) diff --git a/tests/openwisp2/sample_firmware_upgrader/migrations/0005_build_status_firmwareimage_board_and_more.py b/tests/openwisp2/sample_firmware_upgrader/migrations/0005_build_status_firmwareimage_board_and_more.py new file mode 100644 index 000000000..07d46c81b --- /dev/null +++ b/tests/openwisp2/sample_firmware_upgrader/migrations/0005_build_status_firmwareimage_board_and_more.py @@ -0,0 +1,100 @@ +# Generated by Django 5.2.13 on 2026-05-22 22:08 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("sample_firmware_upgrader", "0004_alter_firmwareimage_file"), + ] + + operations = [ + migrations.AddField( + model_name="build", + name="status", + field=models.CharField( + choices=[ + ("analyzing", "Analyzing"), + ("success", "Success"), + ("failed", "Failed"), + ("invalid", "Invalid"), + ("manually_confirmed", "Manually Confirmed"), + ], + db_index=True, + default="analyzing", + max_length=20, + verbose_name="extraction status", + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="board", + field=models.CharField(blank=True, max_length=200), + ), + migrations.AddField( + model_name="firmwareimage", + name="compat_version", + field=models.CharField(blank=True, max_length=10), + ), + migrations.AddField( + model_name="firmwareimage", + name="compatible", + field=models.JSONField(blank=True, default=list), + ), + migrations.AddField( + model_name="firmwareimage", + name="extraction_log", + field=models.TextField(blank=True), + ), + migrations.AddField( + model_name="firmwareimage", + name="extraction_status", + field=models.CharField( + choices=[ + ("unconfirmed", "Unconfirmed"), + ("in_progress", "In Progress"), + ("success", "Success"), + ("failed", "Failed"), + ("manually_confirmed", "Manually Confirmed"), + ("invalid", "Invalid"), + ], + db_index=True, + default="unconfirmed", + max_length=20, + verbose_name="extraction status", + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="failure_reason", + field=models.CharField( + blank=True, + choices=[ + ("unsupported_format", "Unsupported format"), + ("out_of_memory", "Out of memory"), + ("invalid_file", "Invalid file"), + ("timeout", "Extraction timed out"), + ], + default="", + max_length=20, + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="fw_version", + field=models.CharField( + blank=True, max_length=50, verbose_name="firmware version" + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="source", + field=models.CharField(blank=True, max_length=20), + ), + migrations.AddField( + model_name="firmwareimage", + name="target", + field=models.CharField(blank=True, max_length=100), + ), + ] From d7a933050847c1f94344f7e8ebd146df73f9f7a8 Mon Sep 17 00:00:00 2001 From: atif09 Date: Tue, 16 Jun 2026 02:17:21 +0530 Subject: [PATCH 2/5] [fix] Addressing review comments #412 - Clear compat_version when the firmware file is replaced - Define Celery enqueue until after the admin save commits - Only mark DTB-backed images as manually confirmed when editable metadata changed - Add translatable verbose names for new metadata fields - Lock metadata based on persisted status not only the incoming one - Only enqueue device pairing on the transition into a confirmed state - Backfill legacy FirmwareImage extraction state before rollout - Removed hard-coded admin route namespace for swappable models - Mark notification text as translatable - Make the warning assertion robust to logger formatting style - Fix hardcoded admin reverse + missing i18n for extraction notifications and metadata field labels Related to #412 --- openwisp_firmware_upgrader/admin.py | 7 ++- openwisp_firmware_upgrader/base/models.py | 42 +++++++++---- .../0019_backfill_extraction_status.py | 30 +++++++++ ...0020_alter_firmwareimage_board_and_more.py | 61 +++++++++++++++++++ openwisp_firmware_upgrader/tasks.py | 15 +++-- .../tests/test_tasks.py | 2 +- 6 files changed, 137 insertions(+), 20 deletions(-) create mode 100644 openwisp_firmware_upgrader/migrations/0019_backfill_extraction_status.py create mode 100644 openwisp_firmware_upgrader/migrations/0020_alter_firmwareimage_board_and_more.py diff --git a/openwisp_firmware_upgrader/admin.py b/openwisp_firmware_upgrader/admin.py index 7825e0a13..2e069dd27 100644 --- a/openwisp_firmware_upgrader/admin.py +++ b/openwisp_firmware_upgrader/admin.py @@ -12,6 +12,7 @@ from django.core.exceptions import ValidationError from django.core.paginator import InvalidPage, Paginator from django.core.serializers.json import DjangoJSONEncoder +from django.db import transaction from django.forms.formsets import DELETION_FIELD_NAME from django.shortcuts import redirect from django.template.response import TemplateResponse @@ -268,9 +269,10 @@ def save_model(self, request, obj, form, change): obj.compatible = [] obj.target = "" obj.fw_version = "" + obj.compat_version = "" obj.source = "" super().save_model(request, obj, form, change) - extract_firmware_metadata.delay(obj.pk) + transaction.on_commit(lambda: extract_firmware_metadata.delay(obj.pk)) return if change: if obj.extraction_status == FirmwareImage.STATUS_FAILED: @@ -281,6 +283,9 @@ def save_model(self, request, obj, form, change): elif ( obj.extraction_status == FirmwareImage.STATUS_SUCCESS and obj.source == "dtb" + and any( + field in form.changed_data for field in ["target", "fw_version"] + ) ): obj.extraction_status = FirmwareImage.STATUS_MANUALLY_CONFIRMED super().save_model(request, obj, form, change) diff --git a/openwisp_firmware_upgrader/base/models.py b/openwisp_firmware_upgrader/base/models.py index 9053780d3..e52b619d0 100644 --- a/openwisp_firmware_upgrader/base/models.py +++ b/openwisp_firmware_upgrader/base/models.py @@ -12,6 +12,7 @@ from django.urls import reverse from django.utils import timezone from django.utils.functional import cached_property +from django.utils.html import format_html from django.utils.translation import gettext_lazy as _ from openwisp_notifications.signals import notify from private_storage.fields import PrivateFileField @@ -335,8 +336,9 @@ def _notify_extraction_complete(self, new_status): ) status_display = dict(self.BUILD_STATUS_CHOICES)[new_status] try: + opts = self.__class__._meta admin_url = reverse( - "admin:firmware_upgrader_build_change", + f"admin:{opts.app_label}_{opts.model_name}_change", args=[str(self.pk)], ) notify.send( @@ -345,10 +347,14 @@ def _notify_extraction_complete(self, new_status): level=level, url=admin_url, target=self, - message=( - f"Metadata extraction for build " - f'"{self}" ' - f"completed with status: {status_display}." + message=format_html( + _( + 'Metadata extraction for build {build} ' + "completed with status: {status}." + ), + url=admin_url, + build=self, + status=status_display, ), ) except Exception: @@ -416,19 +422,20 @@ class AbstractFirmwareImage(TimeStampedEditableModel): default=STATUS_UNCONFIRMED, db_index=True, ) - extraction_log = models.TextField(blank=True) + extraction_log = models.TextField(_("extraction log"), blank=True) failure_reason = models.CharField( + _("failure reason"), max_length=20, choices=FAILURE_REASON_CHOICES, blank=True, default="", ) - board = models.CharField(max_length=200, blank=True) - compatible = models.JSONField(default=list, blank=True) - target = models.CharField(max_length=100, blank=True) + board = models.CharField(_("board"), max_length=200, blank=True) + compatible = models.JSONField(_("compatible"), default=list, blank=True) + target = models.CharField(_("target"), max_length=100, blank=True) fw_version = models.CharField(_("firmware version"), max_length=50, blank=True) - compat_version = models.CharField(max_length=10, blank=True) - source = models.CharField(max_length=20, blank=True) + compat_version = models.CharField(_("compat version"), max_length=10, blank=True) + source = models.CharField(_("source"), max_length=20, blank=True) class Meta: abstract = True @@ -508,6 +515,7 @@ def _validate_locked(self): original = ( self.__class__.objects.filter(pk=self.pk) .values( + "extraction_status", "board", "compatible", "target", @@ -519,6 +527,11 @@ def _validate_locked(self): ) if not original: return + if ( + original["extraction_status"] not in self.LOCKED_STATUSES + and self.extraction_status not in self.LOCKED_STATUSES + ): + return for field in ( "board", "compatible", @@ -760,10 +773,13 @@ def auto_add_device_firmware_to_device(cls, instance, created, **kwargs): def auto_create_device_firmwares(cls, instance, created, **kwargs): if created: return - if instance.extraction_status not in ( + confirmed_statuses = ( instance.STATUS_SUCCESS, instance.STATUS_MANUALLY_CONFIRMED, - ): + ) + if instance.extraction_status not in confirmed_statuses: + return + if getattr(instance, "_original_extraction_status", None) in confirmed_statuses: return transaction.on_commit( partial(create_all_device_firmwares.delay, str(instance.pk)) diff --git a/openwisp_firmware_upgrader/migrations/0019_backfill_extraction_status.py b/openwisp_firmware_upgrader/migrations/0019_backfill_extraction_status.py new file mode 100644 index 000000000..e1a4ba8e3 --- /dev/null +++ b/openwisp_firmware_upgrader/migrations/0019_backfill_extraction_status.py @@ -0,0 +1,30 @@ +from django.db import migrations + + +def backfill_firmware_image_status(apps, schema_editor): + FirmwareImage = apps.get_model("firmware_upgrader", "FirmwareImage") + FirmwareImage.objects.filter(extraction_status="unconfirmed").update( + extraction_status="manually_confirmed", + source="manual", + ) + + +def backfill_build_status(apps, schema_editor): + Build = apps.get_model("firmware_upgrader", "Build") + Build.objects.filter(status="analyzing").update(status="manually_confirmed") + + +class Migration(migrations.Migration): + dependencies = [ + ("firmware_upgrader", "0018_build_status_firmwareimage_board_and_more"), + ] + operations = [ + migrations.RunPython( + backfill_firmware_image_status, + reverse_code=migrations.RunPython.noop, + ), + migrations.RunPython( + backfill_build_status, + reverse_code=migrations.RunPython.noop, + ), + ] diff --git a/openwisp_firmware_upgrader/migrations/0020_alter_firmwareimage_board_and_more.py b/openwisp_firmware_upgrader/migrations/0020_alter_firmwareimage_board_and_more.py new file mode 100644 index 000000000..87056a3e5 --- /dev/null +++ b/openwisp_firmware_upgrader/migrations/0020_alter_firmwareimage_board_and_more.py @@ -0,0 +1,61 @@ +# Generated by Django 5.2.14 on 2026-06-15 20:36 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("firmware_upgrader", "0019_backfill_extraction_status"), + ] + + operations = [ + migrations.AlterField( + model_name="firmwareimage", + name="board", + field=models.CharField(blank=True, max_length=200, verbose_name="board"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="compat_version", + field=models.CharField( + blank=True, max_length=10, verbose_name="compat version" + ), + ), + migrations.AlterField( + model_name="firmwareimage", + name="compatible", + field=models.JSONField(blank=True, default=list, verbose_name="compatible"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="extraction_log", + field=models.TextField(blank=True, verbose_name="extraction log"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="failure_reason", + field=models.CharField( + blank=True, + choices=[ + ("unsupported_format", "Unsupported format"), + ("out_of_memory", "Out of memory"), + ("invalid_file", "Invalid file"), + ("timeout", "Extraction timed out"), + ], + default="", + max_length=20, + verbose_name="failure reason", + ), + ), + migrations.AlterField( + model_name="firmwareimage", + name="source", + field=models.CharField(blank=True, max_length=20, verbose_name="source"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="target", + field=models.CharField(blank=True, max_length=100, verbose_name="target"), + ), + ] diff --git a/openwisp_firmware_upgrader/tasks.py b/openwisp_firmware_upgrader/tasks.py index a11b7bf35..1e9b2fa8e 100644 --- a/openwisp_firmware_upgrader/tasks.py +++ b/openwisp_firmware_upgrader/tasks.py @@ -213,8 +213,10 @@ def extract_firmware_metadata(self, image_pk): image = FirmwareImage.objects.select_related( "build", "build__category" ).get(pk=image_pk) + build_opts = image.build._meta admin_url = reverse( - "admin:firmware_upgrader_build_change", args=[str(image.build_id)] + f"admin:{build_opts.app_label}_{build_opts.model_name}_change", + args=[str(image.build_id)], ) notify.send( sender=image, @@ -222,10 +224,13 @@ def extract_firmware_metadata(self, image_pk): level="error", url=admin_url, target=image.build, - message=( - f'Metadata extraction failed for {image}: ' - f'{update.get("failure_reason", "unknown error")}. ' - f"You can manually enter metadata or re-upload the image." + message=_( + 'Metadata extraction failed for {image}: ' + "{reason}. You can manually enter metadata or re-upload the image." + ).format( + url=admin_url, + image=image, + reason=update.get("failure_reason", "unknown error"), ), ) except Exception: diff --git a/openwisp_firmware_upgrader/tests/test_tasks.py b/openwisp_firmware_upgrader/tests/test_tasks.py index 4690944b9..6b03ee9d1 100644 --- a/openwisp_firmware_upgrader/tests/test_tasks.py +++ b/openwisp_firmware_upgrader/tests/test_tasks.py @@ -195,7 +195,7 @@ def test_extract_firmware_metadata_image_not_found(self, mock_warning): fake_pk = str(uuid.uuid4()) tasks.extract_firmware_metadata.run(fake_pk) mock_warning.assert_called_once() - self.assertIn(fake_pk, mock_warning.call_args.args) + self.assertTrue(any(fake_pk in str(arg) for arg in mock_warning.call_args.args)) @mock.patch(_MOCK_EXTRACTOR) def test_extract_firmware_metadata_skips_non_unconfirmed(self, MockExtractor): From ce27733b20a5e9702c4a933a5601e873c02b0214 Mon Sep 17 00:00:00 2001 From: atif09 Date: Wed, 17 Jun 2026 02:47:43 +0530 Subject: [PATCH 3/5] [tests] Add state machine guards and test for firmware image metadata locking #412 - track original extraction status on load so transition checks work correctly - fix new-object guard in _validate_locked to use _state.adding instead of pk check - lock metadata against bypass where status and fields change in the same save - skip device pairing re-trigger when image was already confirmed before the save - drop defensive getattr now that _original_extraction_status is always set in __init__ - add test to confirm metadata lock can't be bypassed by changing status in the same save - add test to confirm device pairing doesn't re-trigger on re-saves of confirmed images - add test to confirm compat_version is cleared when a firmware file is replaced - add test to confirm dtb status flip only triggers on target/fw_version changes, not other fields Related to #412 --- openwisp_firmware_upgrader/base/models.py | 12 ++-- .../tests/test_admin.py | 40 +++++++++++- .../tests/test_models.py | 27 +++++++- ...0006_alter_firmwareimage_board_and_more.py | 61 +++++++++++++++++++ 4 files changed, 134 insertions(+), 6 deletions(-) create mode 100644 tests/openwisp2/sample_firmware_upgrader/migrations/0006_alter_firmwareimage_board_and_more.py diff --git a/openwisp_firmware_upgrader/base/models.py b/openwisp_firmware_upgrader/base/models.py index e52b619d0..ca8e69103 100644 --- a/openwisp_firmware_upgrader/base/models.py +++ b/openwisp_firmware_upgrader/base/models.py @@ -437,6 +437,10 @@ class AbstractFirmwareImage(TimeStampedEditableModel): compat_version = models.CharField(_("compat version"), max_length=10, blank=True) source = models.CharField(_("source"), max_length=20, blank=True) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._original_extraction_status = self.extraction_status + class Meta: abstract = True verbose_name = _("Firmware Image") @@ -510,7 +514,7 @@ def trigger_metadata_extraction(cls, instance, created, **kwargs): transaction.on_commit(lambda: extract_firmware_metadata.delay(str(instance.pk))) def _validate_locked(self): - if not self.pk or self.extraction_status not in self.LOCKED_STATUSES: + if self._state.adding or not self.pk: return original = ( self.__class__.objects.filter(pk=self.pk) @@ -773,13 +777,13 @@ def auto_add_device_firmware_to_device(cls, instance, created, **kwargs): def auto_create_device_firmwares(cls, instance, created, **kwargs): if created: return - confirmed_statuses = ( + confirmed = ( instance.STATUS_SUCCESS, instance.STATUS_MANUALLY_CONFIRMED, ) - if instance.extraction_status not in confirmed_statuses: + if instance.extraction_status not in confirmed: return - if getattr(instance, "_original_extraction_status", None) in confirmed_statuses: + if instance._original_extraction_status in confirmed: return transaction.on_commit( partial(create_all_device_firmwares.delay, str(instance.pk)) diff --git a/openwisp_firmware_upgrader/tests/test_admin.py b/openwisp_firmware_upgrader/tests/test_admin.py index 5e546b947..447f39027 100644 --- a/openwisp_firmware_upgrader/tests/test_admin.py +++ b/openwisp_firmware_upgrader/tests/test_admin.py @@ -214,6 +214,43 @@ def test_firmware_image_has_change_permission(self): self.assertIs(inline.has_change_permission(request), True) self.assertIs(inline.has_change_permission(request, obj=env["image1a"]), False) + def test_firmware_image_save_model_clears_compat_version_on_file_change(self): + fw = self._create_firmware_image() + FirmwareImage.objects.filter(pk=fw.pk).update( + board="TP-Link WDR4300", + compat_version="21.09", + extraction_status=FirmwareImage.STATUS_SUCCESS, + ) + fw.refresh_from_db() + request = MockRequest() + request.user = User.objects.first() + form = mock.MagicMock() + form.changed_data = ["file"] + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + with mock.patch("django.db.transaction.on_commit"): + fw_admin.save_model(request, fw, form, change=True) + fw.refresh_from_db() + self.assertEqual(fw.board, "") + self.assertEqual(fw.compat_version, "") + + def test_firmware_image_save_model_dtb_no_flip_without_changed_fields(self): + fw = self._create_firmware_image() + FirmwareImage.objects.filter(pk=fw.pk).update( + source="dtb", + board="Orange Pi Zero", + extraction_status=FirmwareImage.STATUS_SUCCESS, + ) + fw.refresh_from_db() + request = MockRequest() + request.user = User.objects.first() + form = mock.MagicMock() + form.changed_data = ["board"] + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + with mock.patch("django.db.transaction.on_commit"): + fw_admin.save_model(request, fw, form, change=True) + fw.refresh_from_db() + self.assertEqual(fw.extraction_status, FirmwareImage.STATUS_SUCCESS) + def test_device_firmware_inline_has_add_permission(self): device_fw = self._create_device_firmware() device = device_fw.device @@ -1157,7 +1194,8 @@ def test_firmware_image_save_model_file_change_triggers_extraction(self, mock_ta fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) form = mock.MagicMock() form.changed_data = ["file"] - fw_admin.save_model(request, fw, form, change=True) + with self.captureOnCommitCallbacks(execute=True): + fw_admin.save_model(request, fw, form, change=True) fw.refresh_from_db() self.assertEqual(fw.extraction_status, FirmwareImage.STATUS_UNCONFIRMED) self.assertEqual(fw.board, "") diff --git a/openwisp_firmware_upgrader/tests/test_models.py b/openwisp_firmware_upgrader/tests/test_models.py index caa99587d..d7f1beaeb 100644 --- a/openwisp_firmware_upgrader/tests/test_models.py +++ b/openwisp_firmware_upgrader/tests/test_models.py @@ -697,6 +697,19 @@ def test_validate_locked_allows_filling_empty_dtb_fields(self): image.target = "sunxi/cortexa7" image._validate_locked() + def test_validate_locked_blocks_bypass_via_status_change(self): + image = self._create_firmware_image() + image.extraction_status = FirmwareImage.STATUS_SUCCESS + image.board = "TP-Link WDR4300" + image.target = "ath79/generic" + image.source = "fwtool" + image.save() + image.extraction_status = FirmwareImage.STATUS_FAILED + image.board = "Tampered board" + with self.assertRaises(ValidationError) as ctx: + image._validate_locked() + self.assertIn("read-only", str(ctx.exception)) + @capture_any_output() def test_device_firmware_clean_blocks_unconfirmed_image(self): image = self._create_firmware_image() @@ -722,13 +735,25 @@ def test_auto_create_device_firmwares_skip_unconfirmed(self): mock_on_commit.assert_not_called() def test_auto_create_device_firmwares_triggers_on_confirmed(self): - image = self._create_firmware_image() + image = self._create_firmware_image( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) image.extraction_status = FirmwareImage.STATUS_SUCCESS image.save() with mock.patch("django.db.transaction.on_commit") as mock_on_commit: DeviceFirmware.auto_create_device_firmwares(instance=image, created=False) mock_on_commit.assert_called_once() + def test_auto_create_device_firmwares_skips_already_confirmed_resave(self): + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_SUCCESS + ) + image = FirmwareImage.objects.get(pk=image.pk) + with mock.patch("django.db.transaction.on_commit") as mock_on_commit: + DeviceFirmware.auto_create_device_firmwares(instance=image, created=False) + mock_on_commit.assert_not_called() + class TestModelsTransaction(TestUpgraderMixin, TransactionTestCase): _mock_updrade = "openwisp_firmware_upgrader.upgraders.openwrt.OpenWrt.upgrade" diff --git a/tests/openwisp2/sample_firmware_upgrader/migrations/0006_alter_firmwareimage_board_and_more.py b/tests/openwisp2/sample_firmware_upgrader/migrations/0006_alter_firmwareimage_board_and_more.py new file mode 100644 index 000000000..6b7557b0b --- /dev/null +++ b/tests/openwisp2/sample_firmware_upgrader/migrations/0006_alter_firmwareimage_board_and_more.py @@ -0,0 +1,61 @@ +# Generated by Django 5.2.15 on 2026-06-17 08:23 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("sample_firmware_upgrader", "0005_build_status_firmwareimage_board_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="firmwareimage", + name="board", + field=models.CharField(blank=True, max_length=200, verbose_name="board"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="compat_version", + field=models.CharField( + blank=True, max_length=10, verbose_name="compat version" + ), + ), + migrations.AlterField( + model_name="firmwareimage", + name="compatible", + field=models.JSONField(blank=True, default=list, verbose_name="compatible"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="extraction_log", + field=models.TextField(blank=True, verbose_name="extraction log"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="failure_reason", + field=models.CharField( + blank=True, + choices=[ + ("unsupported_format", "Unsupported format"), + ("out_of_memory", "Out of memory"), + ("invalid_file", "Invalid file"), + ("timeout", "Extraction timed out"), + ], + default="", + max_length=20, + verbose_name="failure reason", + ), + ), + migrations.AlterField( + model_name="firmwareimage", + name="source", + field=models.CharField(blank=True, max_length=20, verbose_name="source"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="target", + field=models.CharField(blank=True, max_length=100, verbose_name="target"), + ), + ] From ef601884488dd64a0a5e66c4306fb78e87bdb709 Mon Sep 17 00:00:00 2001 From: atif09 Date: Wed, 17 Jun 2026 19:12:22 +0530 Subject: [PATCH 4/5] [fix] Addressing review comments #412 - Make build status transition atomic to prevent duplicate notifications - Refresh _original_extraction_status in save() to prevent duplicate device pairing enqueue - Fix format string placeholder mismatch in failure notification using format_html - Fix the typo in the imported base view name Related to #412 --- openwisp_firmware_upgrader/base/models.py | 20 ++++++++++++++----- openwisp_firmware_upgrader/tasks.py | 10 ++++++---- .../tests/test_models.py | 1 - .../openwisp2/sample_connection/api/views.py | 2 +- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/openwisp_firmware_upgrader/base/models.py b/openwisp_firmware_upgrader/base/models.py index ca8e69103..6f36012e9 100644 --- a/openwisp_firmware_upgrader/base/models.py +++ b/openwisp_firmware_upgrader/base/models.py @@ -297,6 +297,7 @@ def _find_firmwareless_devices(self, boards=None, group=None, location=None): return qs.order_by("-created") def _update_extraction_status(self): + Build = load_model("Build") FirmwareImage = load_model("FirmwareImage") statuses = set( FirmwareImage.objects.filter(build_id=self.pk).values_list( @@ -316,13 +317,18 @@ def _update_extraction_status(self): new_status = self.BUILD_STATUS_MANUALLY_CONFIRMED else: new_status = self.BUILD_STATUS_SUCCESS - if new_status == self.status: + rows_updated = Build.objects.filter( + pk=self.pk, status=self.BUILD_STATUS_ANALYZING + ).update(status=new_status) + if rows_updated: + self.status = new_status + if new_status != self.BUILD_STATUS_ANALYZING: + self._notify_extraction_complete(new_status) return - old_status = self.status - load_model("Build").objects.filter(pk=self.pk).update(status=new_status) + Build.objects.filter(pk=self.pk).exclude(status=new_status).update( + status=new_status + ) self.status = new_status - if old_status == self.BUILD_STATUS_ANALYZING: - self._notify_extraction_complete(new_status) def _notify_extraction_complete(self, new_status): level = ( @@ -441,6 +447,10 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._original_extraction_status = self.extraction_status + def save(self, *args, **kwargs): + super().save(*args, **kwargs) + self._original_extraction_status = self.extraction_status + class Meta: abstract = True verbose_name = _("Firmware Image") diff --git a/openwisp_firmware_upgrader/tasks.py b/openwisp_firmware_upgrader/tasks.py index 1e9b2fa8e..c1abf7dee 100644 --- a/openwisp_firmware_upgrader/tasks.py +++ b/openwisp_firmware_upgrader/tasks.py @@ -5,6 +5,7 @@ from celery.exceptions import SoftTimeLimitExceeded from django.core.exceptions import ObjectDoesNotExist from django.urls import reverse +from django.utils.html import format_html from django.utils.translation import gettext_lazy as _ from openwisp_notifications.signals import notify @@ -224,10 +225,11 @@ def extract_firmware_metadata(self, image_pk): level="error", url=admin_url, target=image.build, - message=_( - 'Metadata extraction failed for {image}: ' - "{reason}. You can manually enter metadata or re-upload the image." - ).format( + message=format_html( + _( + 'Metadata extraction failed for {image}: ' + "{reason}. You can manually enter metadata or re-upload the image." + ), url=admin_url, image=image, reason=update.get("failure_reason", "unknown error"), diff --git a/openwisp_firmware_upgrader/tests/test_models.py b/openwisp_firmware_upgrader/tests/test_models.py index d7f1beaeb..80f1acce9 100644 --- a/openwisp_firmware_upgrader/tests/test_models.py +++ b/openwisp_firmware_upgrader/tests/test_models.py @@ -739,7 +739,6 @@ def test_auto_create_device_firmwares_triggers_on_confirmed(self): extraction_status=FirmwareImage.STATUS_UNCONFIRMED ) image.extraction_status = FirmwareImage.STATUS_SUCCESS - image.save() with mock.patch("django.db.transaction.on_commit") as mock_on_commit: DeviceFirmware.auto_create_device_firmwares(instance=image, created=False) mock_on_commit.assert_called_once() diff --git a/tests/openwisp2/sample_connection/api/views.py b/tests/openwisp2/sample_connection/api/views.py index e8d868e3e..fd2207cbc 100644 --- a/tests/openwisp2/sample_connection/api/views.py +++ b/tests/openwisp2/sample_connection/api/views.py @@ -14,7 +14,7 @@ DeviceConnectionDetailView as BaseDeviceConnectionDetailView, ) from openwisp_controller.connection.api.views import ( - DeviceConnenctionListCreateView as BaseDeviceConnectionListCreateView, + DeviceConnectionListCreateView as BaseDeviceConnectionListCreateView, ) From 57d4cfed9349fcbe9d5506dab95a74034a6ca48a Mon Sep 17 00:00:00 2001 From: atif09 Date: Sat, 20 Jun 2026 04:11:54 +0530 Subject: [PATCH 5/5] [feature] Add bulk Re-extract metadata admin action #413 - Added re_extract_metadata bulk action to FirmwareImageAdmin that resets image metadata fields and reschedules extraction tasks - Added tests to cover single and multi-image bulk re-extraction Related to #413 --- openwisp_firmware_upgrader/admin.py | 32 ++++++++ .../tests/test_admin.py | 77 +++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/openwisp_firmware_upgrader/admin.py b/openwisp_firmware_upgrader/admin.py index 2e069dd27..beb7163b1 100644 --- a/openwisp_firmware_upgrader/admin.py +++ b/openwisp_firmware_upgrader/admin.py @@ -1,6 +1,7 @@ import json import logging from datetime import timedelta +from functools import partial import reversion import swapper @@ -167,6 +168,7 @@ class FirmwareImageAdmin(BaseAdmin): list_filter = ["extraction_status", "build__category"] search_fields = ["board", "target", "type"] ordering = ["-created"] + actions = ["re_extract_metadata"] readonly_fields = [ "created", "modified", @@ -290,6 +292,36 @@ def save_model(self, request, obj, form, change): obj.extraction_status = FirmwareImage.STATUS_MANUALLY_CONFIRMED super().save_model(request, obj, form, change) + @admin.action( + description=_("Re-extract metadata from selected images"), + permissions=["change"], + ) + def re_extract_metadata(self, request, queryset): + image_pks = list(queryset.values_list("pk", flat=True)) + build_ids = set(queryset.values_list("build_id", flat=True)) + queryset.update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED, + extraction_log="", + failure_reason="", + board="", + compatible=[], + target="", + fw_version="", + compat_version="", + source="", + ) + Build.objects.filter(pk__in=build_ids).update( + status=Build.BUILD_STATUS_ANALYZING + ) + for pk in image_pks: + transaction.on_commit(partial(extract_firmware_metadata.delay, pk)) + self.message_user( + request, + _("Metadata re-extraction scheduled for %(count)d image(s).") + % {"count": len(image_pks)}, + messages.SUCCESS, + ) + class BatchUpgradeConfirmationForm(forms.ModelForm): upgrade_options = forms.JSONField(widget=FirmwareSchemaWidget(), required=False) diff --git a/openwisp_firmware_upgrader/tests/test_admin.py b/openwisp_firmware_upgrader/tests/test_admin.py index 447f39027..50fd7db6e 100644 --- a/openwisp_firmware_upgrader/tests/test_admin.py +++ b/openwisp_firmware_upgrader/tests/test_admin.py @@ -251,6 +251,83 @@ def test_firmware_image_save_model_dtb_no_flip_without_changed_fields(self): fw.refresh_from_db() self.assertEqual(fw.extraction_status, FirmwareImage.STATUS_SUCCESS) + def test_re_extract_metadata_action(self): + self._login() + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_FAILED, + failure_reason=FirmwareImage.FAILURE_UNSUPPORTED, + extraction_log="log output", + board="TP-Link WDR4300", + compatible=["tplink,tl-wdr4300-v1"], + target="ath79/generic", + fw_version="23.05.5", + compat_version="1.0", + source="fwtool", + ) + Build.objects.filter(pk=image.build_id).update(status=Build.BUILD_STATUS_FAILED) + url = reverse(f"admin:{self.app_label}_firmwareimage_changelist") + with mock.patch( + "openwisp_firmware_upgrader.tasks.extract_firmware_metadata.delay" + ) as mocked_delay: + with self.captureOnCommitCallbacks(execute=True): + r = self.client.post( + url, + { + "action": "re_extract_metadata", + ACTION_CHECKBOX_NAME: (str(image.pk),), + }, + follow=True, + ) + self.assertEqual(r.status_code, 200) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_UNCONFIRMED) + self.assertEqual(image.extraction_log, "") + self.assertEqual(image.failure_reason, "") + self.assertEqual(image.board, "") + self.assertEqual(image.compatible, []) + self.assertEqual(image.target, "") + self.assertEqual(image.fw_version, "") + self.assertEqual(image.compat_version, "") + self.assertEqual(image.source, "") + image.build.refresh_from_db() + self.assertEqual(image.build.status, Build.BUILD_STATUS_ANALYZING) + mocked_delay.assert_called_once_with(image.pk) + + def test_re_extract_metadata_action_multiple(self): + self._login() + build = self._create_build() + image1 = self._create_firmware_image(build=build, type=self.TPLINK_4300_IMAGE) + image2 = self._create_firmware_image( + build=build, type=self.TPLINK_4300_IL_IMAGE + ) + FirmwareImage.objects.filter(pk__in=[image1.pk, image2.pk]).update( + extraction_status=FirmwareImage.STATUS_FAILED, + ) + Build.objects.filter(pk=build.pk).update(status=Build.BUILD_STATUS_FAILED) + url = reverse(f"admin:{self.app_label}_firmwareimage_changelist") + with mock.patch( + "openwisp_firmware_upgrader.tasks.extract_firmware_metadata.delay" + ) as mocked_delay: + with self.captureOnCommitCallbacks(execute=True): + self.client.post( + url, + { + "action": "re_extract_metadata", + ACTION_CHECKBOX_NAME: (str(image1.pk), str(image2.pk)), + }, + follow=True, + ) + self.assertEqual(mocked_delay.call_count, 2) + called_pks = {call.args[0] for call in mocked_delay.call_args_list} + self.assertEqual(called_pks, {image1.pk, image2.pk}) + image1.refresh_from_db() + image2.refresh_from_db() + self.assertEqual(image1.extraction_status, FirmwareImage.STATUS_UNCONFIRMED) + self.assertEqual(image2.extraction_status, FirmwareImage.STATUS_UNCONFIRMED) + build.refresh_from_db() + self.assertEqual(build.status, Build.BUILD_STATUS_ANALYZING) + def test_device_firmware_inline_has_add_permission(self): device_fw = self._create_device_firmware() device = device_fw.device