diff --git a/openwisp_firmware_upgrader/admin.py b/openwisp_firmware_upgrader/admin.py index ea25e5dc5..beb7163b1 100644 --- a/openwisp_firmware_upgrader/admin.py +++ b/openwisp_firmware_upgrader/admin.py @@ -1,6 +1,7 @@ import json import logging from datetime import timedelta +from functools import partial import reversion import swapper @@ -12,6 +13,7 @@ from django.core.exceptions import ValidationError from django.core.paginator import InvalidPage, Paginator from django.core.serializers.json import DjangoJSONEncoder +from django.db import transaction from django.forms.formsets import DELETION_FIELD_NAME from django.shortcuts import redirect from django.template.response import TemplateResponse @@ -38,6 +40,7 @@ LocationFilter, ) from .swapper import load_model +from .tasks import extract_firmware_metadata from .utils import get_upgrader_schema_for_device from .widgets import FirmwareSchemaWidget, MassUpgradeSelect2Widget @@ -60,6 +63,45 @@ ) IN_PROGRESS_STATUS = UpgradeOperation.CANCELLABLE_STATUS +_STATUS_CONFIG = { + "unconfirmed": {"label": _("Unconfirmed"), "class": "ow-status-grey"}, + "in_progress": {"label": _("In Progress"), "class": "ow-status-warning"}, + "success": {"label": _("Success"), "class": "ow-status-success"}, + "failed": {"label": _("Failed"), "class": "ow-status-error"}, + "manually_confirmed": { + "label": _("Manually Confirmed"), + "class": "ow-status-success", + }, + "invalid": {"label": _("Invalid"), "class": "ow-status-error"}, +} + +_BUILD_STATUS_CONFIG = { + "analyzing": ("warning", _("Analyzing")), + "success": ("success", _("Success")), + "failed": ("error", _("Failed")), + "invalid": ("error", _("Invalid")), + "manually_confirmed": ("success", _("Manually Confirmed")), +} + +_FAILURE_REASON_TEXT = { + "unsupported_format": _( + "Both fwtool and DTB scan were unable to extract metadata. " + "Fill in the Device Metadata fields below manually." + ), + "out_of_memory": _("Decompression exceeded the configured size or ratio limit."), + "invalid_file": _("This file was rejected as an invalid firmware image."), + "timeout": _("Metadata extraction timed out. Re-upload the image to try again."), +} + + +def _extraction_status_badge(status): + cfg = _STATUS_CONFIG.get(status, {"label": status, "class": "ow-status-grey"}) + return format_html( + '{}', + cfg["class"], + cfg["label"], + ) + class BaseAdmin(MultitenantAdminMixin, TimeReadonlyAdminMixin, admin.ModelAdmin): save_on_top = True @@ -82,6 +124,11 @@ class CategoryAdmin(BaseVersionAdmin): class FirmwareImageInline(TimeReadonlyAdminMixin, admin.StackedInline): model = FirmwareImage extra = 0 + readonly_fields = ["created", "modified", "extraction_status_display"] + + @admin.display(description=_("Extraction Status")) + def extraction_status_display(self, obj): + return _extraction_status_badge(obj.extraction_status) class Media: extra = "" if getattr(settings, "DEBUG", False) else ".min" @@ -108,6 +155,174 @@ def has_change_permission(self, request, obj=None): return True +@admin.register(FirmwareImage) +class FirmwareImageAdmin(BaseAdmin): + list_display = [ + "__str__", + "build", + "type", + "extraction_status_display", + "created", + "modified", + ] + list_filter = ["extraction_status", "build__category"] + search_fields = ["board", "target", "type"] + ordering = ["-created"] + actions = ["re_extract_metadata"] + readonly_fields = [ + "created", + "modified", + "extraction_status_display", + "failure_reason_display", + "extraction_log_display", + "source", + ] + fieldsets = [ + ( + None, + { + "fields": ["build", "file", "type", "created", "modified"], + }, + ), + ( + _("Extraction Status"), + { + "fields": [ + "extraction_status_display", + "failure_reason_display", + "extraction_log_display", + ], + "description": _( + "Metadata is extracted automatically on upload using fwtool " + "(primary) and DTB scan (fallback). " + "If both fail, fill in the Device Metadata fields below manually." + ), + }, + ), + ( + _("Device Metadata"), + { + "fields": [ + "board", + "compatible", + "target", + "fw_version", + "compat_version", + "source", + ], + }, + ), + ] + + def get_readonly_fields(self, request, obj=None): + readonly = list(self.readonly_fields) + if obj: + status = obj.extraction_status + if status in ( + FirmwareImage.STATUS_IN_PROGRESS, + FirmwareImage.STATUS_INVALID, + FirmwareImage.STATUS_MANUALLY_CONFIRMED, + ): + readonly += [ + "board", + "compatible", + "target", + "fw_version", + "compat_version", + ] + elif status == FirmwareImage.STATUS_SUCCESS: + if obj.source == "dtb": + readonly += ["board", "compatible", "compat_version"] + else: + readonly += [ + "board", + "compatible", + "target", + "fw_version", + "compat_version", + ] + return readonly + + @admin.display(description=_("Extraction Status")) + def extraction_status_display(self, obj): + return _extraction_status_badge(obj.extraction_status) + + @admin.display(description=_("Failure Reason")) + def failure_reason_display(self, obj): + if not obj.failure_reason: + return "-" + return _FAILURE_REASON_TEXT.get(obj.failure_reason, obj.failure_reason) + + @admin.display(description=_("Extraction Log")) + def extraction_log_display(self, obj): + if not obj.extraction_log: + return "-" + return format_html( + '
{}
', + obj.extraction_log, + ) + + def save_model(self, request, obj, form, change): + if change and "file" in form.changed_data: + obj.extraction_status = FirmwareImage.STATUS_UNCONFIRMED + obj.extraction_log = "" + obj.failure_reason = "" + obj.board = "" + obj.compatible = [] + obj.target = "" + obj.fw_version = "" + obj.compat_version = "" + obj.source = "" + super().save_model(request, obj, form, change) + transaction.on_commit(lambda: extract_firmware_metadata.delay(obj.pk)) + return + if change: + if obj.extraction_status == FirmwareImage.STATUS_FAILED: + metadata_fields = ["board", "target", "fw_version"] + if any(f in form.changed_data for f in metadata_fields): + obj.extraction_status = FirmwareImage.STATUS_MANUALLY_CONFIRMED + obj.source = "manual" + elif ( + obj.extraction_status == FirmwareImage.STATUS_SUCCESS + and obj.source == "dtb" + and any( + field in form.changed_data for field in ["target", "fw_version"] + ) + ): + obj.extraction_status = FirmwareImage.STATUS_MANUALLY_CONFIRMED + super().save_model(request, obj, form, change) + + @admin.action( + description=_("Re-extract metadata from selected images"), + permissions=["change"], + ) + def re_extract_metadata(self, request, queryset): + image_pks = list(queryset.values_list("pk", flat=True)) + build_ids = set(queryset.values_list("build_id", flat=True)) + queryset.update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED, + extraction_log="", + failure_reason="", + board="", + compatible=[], + target="", + fw_version="", + compat_version="", + source="", + ) + Build.objects.filter(pk__in=build_ids).update( + status=Build.BUILD_STATUS_ANALYZING + ) + for pk in image_pks: + transaction.on_commit(partial(extract_firmware_metadata.delay, pk)) + self.message_user( + request, + _("Metadata re-extraction scheduled for %(count)d image(s).") + % {"count": len(image_pks)}, + messages.SUCCESS, + ) + + class BatchUpgradeConfirmationForm(forms.ModelForm): upgrade_options = forms.JSONField(widget=FirmwareSchemaWidget(), required=False) build = forms.ModelChoiceField( @@ -172,7 +387,14 @@ class Media: @admin.register(load_model("Build")) class BuildAdmin(BaseAdmin): - list_display = ["__str__", "organization", "category", "created", "modified"] + list_display = [ + "__str__", + "organization", + "category", + "build_status_display", + "created", + "modified", + ] list_filter = [CategoryOrganizationFilter, CategoryFilter] list_select_related = ["category", "category__organization"] search_fields = ["category__name", "version", "os"] @@ -190,6 +412,15 @@ def organization(self, obj): organization.short_description = _("organization") + @admin.display(description=_("Extraction status")) + def build_status_display(self, obj): + css, label = _BUILD_STATUS_CONFIG.get(obj.status, ("grey", obj.status)) + return format_html( + '{}', + css, + label, + ) + @admin.action( description=_("Mass-upgrade devices related to the selected build"), permissions=["change"], diff --git a/openwisp_firmware_upgrader/apps.py b/openwisp_firmware_upgrader/apps.py index ce76c7187..50b7f31e6 100644 --- a/openwisp_firmware_upgrader/apps.py +++ b/openwisp_firmware_upgrader/apps.py @@ -29,6 +29,7 @@ def ready(self, *args, **kwargs): self.connect_device_signals() self.connect_upgrade_signals() self.connect_delete_signals() + self.connect_metadata_signals() def register_menu_groups(self): register_menu_group( @@ -54,6 +55,12 @@ def register_menu_groups(self): "name": "changelist", "icon": "ow-mass-upgrade", }, + 4: { + "label": _("Firmware Images"), + "model": get_model_name(self.label, "FirmwareImage"), + "name": "changelist", + "icon": "ow-firmware", + }, }, "icon": "ow-firmware", }, @@ -116,5 +123,14 @@ def connect_delete_signals(self): dispatch_uid="organization.pre_delete.firmware_files", ) + def connect_metadata_signals(self): + FirmwareImage = load_model("firmware_upgrader", "FirmwareImage") + + post_save.connect( + FirmwareImage.trigger_metadata_extraction, + sender=FirmwareImage, + dispatch_uid="firmware_image.trigger_metadata_extraction", + ) + del ApiAppConfig diff --git a/openwisp_firmware_upgrader/base/models.py b/openwisp_firmware_upgrader/base/models.py index 8bfe6cab2..6f36012e9 100644 --- a/openwisp_firmware_upgrader/base/models.py +++ b/openwisp_firmware_upgrader/base/models.py @@ -9,9 +9,12 @@ from django.core.validators import MaxValueValidator from django.db import models, transaction from django.db.models import Q +from django.urls import reverse from django.utils import timezone from django.utils.functional import cached_property +from django.utils.html import format_html from django.utils.translation import gettext_lazy as _ +from openwisp_notifications.signals import notify from private_storage.fields import PrivateFileField from openwisp_controller.connection.exceptions import NoWorkingDeviceConnectionError @@ -38,6 +41,7 @@ batch_upgrade_operation, create_all_device_firmwares, create_device_firmware, + extract_firmware_metadata, upgrade_firmware, ) from ..utils import ( @@ -145,6 +149,28 @@ class AbstractBuild(TimeStampedEditableModel): ), ) + BUILD_STATUS_ANALYZING = "analyzing" + BUILD_STATUS_SUCCESS = "success" + BUILD_STATUS_FAILED = "failed" + BUILD_STATUS_INVALID = "invalid" + BUILD_STATUS_MANUALLY_CONFIRMED = "manually_confirmed" + + BUILD_STATUS_CHOICES = [ + (BUILD_STATUS_ANALYZING, _("Analyzing")), + (BUILD_STATUS_SUCCESS, _("Success")), + (BUILD_STATUS_FAILED, _("Failed")), + (BUILD_STATUS_INVALID, _("Invalid")), + (BUILD_STATUS_MANUALLY_CONFIRMED, _("Manually Confirmed")), + ] + + status = models.CharField( + _("extraction status"), + max_length=20, + choices=BUILD_STATUS_CHOICES, + default=BUILD_STATUS_ANALYZING, + db_index=True, + ) + class Meta: abstract = True verbose_name = _("Firmware Build") @@ -185,6 +211,20 @@ def batch_upgrade( self, firmwareless, upgrade_options=None, group=None, location=None ): upgrade_options = upgrade_options or {} + FirmwareImage = load_model("FirmwareImage") + unconfirmed = self.firmwareimage_set.exclude( + extraction_status__in=[ + FirmwareImage.STATUS_SUCCESS, + FirmwareImage.STATUS_MANUALLY_CONFIRMED, + ] + ) + if unconfirmed.exists(): + raise ValidationError( + _( + "All firmware images must have confirmed metadata " + "before a mass upgrade can be scheduled" + ) + ) # Check if there are any devices to upgrade with the given filters dry_run_result = load_model("BatchUpgradeOperation").dry_run( build=self, group=group, location=location @@ -256,6 +296,76 @@ def _find_firmwareless_devices(self, boards=None, group=None, location=None): qs = qs.filter(devicelocation__location=location) return qs.order_by("-created") + def _update_extraction_status(self): + Build = load_model("Build") + FirmwareImage = load_model("FirmwareImage") + statuses = set( + FirmwareImage.objects.filter(build_id=self.pk).values_list( + "extraction_status", flat=True + ) + ) + if not statuses: + return + analyzing = {FirmwareImage.STATUS_UNCONFIRMED, FirmwareImage.STATUS_IN_PROGRESS} + if statuses & analyzing: + new_status = self.BUILD_STATUS_ANALYZING + elif FirmwareImage.STATUS_INVALID in statuses: + new_status = self.BUILD_STATUS_INVALID + elif FirmwareImage.STATUS_FAILED in statuses: + new_status = self.BUILD_STATUS_FAILED + elif FirmwareImage.STATUS_MANUALLY_CONFIRMED in statuses: + new_status = self.BUILD_STATUS_MANUALLY_CONFIRMED + else: + new_status = self.BUILD_STATUS_SUCCESS + rows_updated = Build.objects.filter( + pk=self.pk, status=self.BUILD_STATUS_ANALYZING + ).update(status=new_status) + if rows_updated: + self.status = new_status + if new_status != self.BUILD_STATUS_ANALYZING: + self._notify_extraction_complete(new_status) + return + Build.objects.filter(pk=self.pk).exclude(status=new_status).update( + status=new_status + ) + self.status = new_status + + def _notify_extraction_complete(self, new_status): + level = ( + "info" + if new_status + in ( + self.BUILD_STATUS_SUCCESS, + self.BUILD_STATUS_MANUALLY_CONFIRMED, + ) + else "warning" + ) + status_display = dict(self.BUILD_STATUS_CHOICES)[new_status] + try: + opts = self.__class__._meta + admin_url = reverse( + f"admin:{opts.app_label}_{opts.model_name}_change", + args=[str(self.pk)], + ) + notify.send( + sender=self, + type="generic_message", + level=level, + url=admin_url, + target=self, + message=format_html( + _( + 'Metadata extraction for build {build} ' + "completed with status: {status}." + ), + url=admin_url, + build=self, + status=status_display, + ), + ) + except Exception: + logger.exception("Failed to send build extraction completion notification") + def get_build_directory(instance, filename): build_pk = str(instance.build.pk) @@ -282,6 +392,65 @@ class AbstractFirmwareImage(TimeStampedEditableModel): ), ) + STATUS_UNCONFIRMED = "unconfirmed" + STATUS_IN_PROGRESS = "in_progress" + STATUS_SUCCESS = "success" + STATUS_FAILED = "failed" + STATUS_MANUALLY_CONFIRMED = "manually_confirmed" + STATUS_INVALID = "invalid" + LOCKED_STATUSES = (STATUS_SUCCESS, STATUS_MANUALLY_CONFIRMED) + + FAILURE_UNSUPPORTED = "unsupported_format" + FAILURE_OOM = "out_of_memory" + FAILURE_INVALID = "invalid_file" + FAILURE_TIMEOUT = "timeout" + + EXTRACTION_STATUS_CHOICES = [ + (STATUS_UNCONFIRMED, _("Unconfirmed")), + (STATUS_IN_PROGRESS, _("In Progress")), + (STATUS_SUCCESS, _("Success")), + (STATUS_FAILED, _("Failed")), + (STATUS_MANUALLY_CONFIRMED, _("Manually Confirmed")), + (STATUS_INVALID, _("Invalid")), + ] + + FAILURE_REASON_CHOICES = [ + (FAILURE_UNSUPPORTED, _("Unsupported format")), + (FAILURE_OOM, _("Out of memory")), + (FAILURE_INVALID, _("Invalid file")), + (FAILURE_TIMEOUT, _("Extraction timed out")), + ] + + extraction_status = models.CharField( + _("extraction status"), + max_length=20, + choices=EXTRACTION_STATUS_CHOICES, + default=STATUS_UNCONFIRMED, + db_index=True, + ) + extraction_log = models.TextField(_("extraction log"), blank=True) + failure_reason = models.CharField( + _("failure reason"), + max_length=20, + choices=FAILURE_REASON_CHOICES, + blank=True, + default="", + ) + board = models.CharField(_("board"), max_length=200, blank=True) + compatible = models.JSONField(_("compatible"), default=list, blank=True) + target = models.CharField(_("target"), max_length=100, blank=True) + fw_version = models.CharField(_("firmware version"), max_length=50, blank=True) + compat_version = models.CharField(_("compat version"), max_length=10, blank=True) + source = models.CharField(_("source"), max_length=20, blank=True) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._original_extraction_status = self.extraction_status + + def save(self, *args, **kwargs): + super().save(*args, **kwargs) + self._original_extraction_status = self.extraction_status + class Meta: abstract = True verbose_name = _("Firmware Image") @@ -299,12 +468,9 @@ def boards(self): def clean(self): self._clean_type() + self._validate_locked() self._validate_file_header() self._validate_rootfs() - try: - self.boards - except KeyError: - raise ValidationError({"type": "Could not find boards for this type"}) def delete(self, *args, **kwargs): super().delete(*args, **kwargs) @@ -342,15 +508,58 @@ def _remove_file(cls, file_path): return True def _clean_type(self): - """ - auto determine type if missing - """ if self.type: return filename = self.file.name - # removes leading prefix self.type = "-".join(filename.split("-")[1:]) + @classmethod + def trigger_metadata_extraction(cls, instance, created, **kwargs): + if not created: + return + Build = load_model("Build") + Build.objects.filter(pk=instance.build_id).update( + status=Build.BUILD_STATUS_ANALYZING + ) + transaction.on_commit(lambda: extract_firmware_metadata.delay(str(instance.pk))) + + def _validate_locked(self): + if self._state.adding or not self.pk: + return + original = ( + self.__class__.objects.filter(pk=self.pk) + .values( + "extraction_status", + "board", + "compatible", + "target", + "fw_version", + "compat_version", + "source", + ) + .first() + ) + if not original: + return + if ( + original["extraction_status"] not in self.LOCKED_STATUSES + and self.extraction_status not in self.LOCKED_STATUSES + ): + return + for field in ( + "board", + "compatible", + "target", + "fw_version", + "compat_version", + "source", + ): + original_val = original[field] + if original_val and getattr(self, field) != original_val: + raise ValidationError( + _("Metadata fields are read-only after confirmation.") + ) + def _validate_file_header(self): if not self.file: return @@ -449,6 +658,16 @@ class Meta: def clean(self): if not hasattr(self, "image") or not hasattr(self, "device"): return + if self.image.extraction_status not in self.image.LOCKED_STATUSES: + raise ValidationError( + { + "image": _( + "This firmware image's metadata has not been confirmed yet. " + "Metadata extraction must complete successfully " + "before it can be used for upgrades." + ) + } + ) if ( self.image.build.category.organization is not None and self.image.build.category.organization != self.device.organization @@ -567,9 +786,18 @@ def auto_add_device_firmware_to_device(cls, instance, created, **kwargs): @classmethod def auto_create_device_firmwares(cls, instance, created, **kwargs): if created: - transaction.on_commit( - partial(create_all_device_firmwares.delay, instance.pk) - ) + return + confirmed = ( + instance.STATUS_SUCCESS, + instance.STATUS_MANUALLY_CONFIRMED, + ) + if instance.extraction_status not in confirmed: + return + if instance._original_extraction_status in confirmed: + return + transaction.on_commit( + partial(create_all_device_firmwares.delay, str(instance.pk)) + ) @classmethod def get_image_queryset_for_device(cls, device, device_firmware=None): diff --git a/openwisp_firmware_upgrader/migrations/0018_build_status_firmwareimage_board_and_more.py b/openwisp_firmware_upgrader/migrations/0018_build_status_firmwareimage_board_and_more.py new file mode 100644 index 000000000..9a13e189d --- /dev/null +++ b/openwisp_firmware_upgrader/migrations/0018_build_status_firmwareimage_board_and_more.py @@ -0,0 +1,100 @@ +# Generated by Django 5.2.13 on 2026-05-22 16:49 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("firmware_upgrader", "0017_alter_batchupgradeoperation_status"), + ] + + operations = [ + migrations.AddField( + model_name="build", + name="status", + field=models.CharField( + choices=[ + ("analyzing", "Analyzing"), + ("success", "Success"), + ("failed", "Failed"), + ("invalid", "Invalid"), + ("manually_confirmed", "Manually Confirmed"), + ], + db_index=True, + default="analyzing", + max_length=20, + verbose_name="extraction status", + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="board", + field=models.CharField(blank=True, max_length=200), + ), + migrations.AddField( + model_name="firmwareimage", + name="compat_version", + field=models.CharField(blank=True, max_length=10), + ), + migrations.AddField( + model_name="firmwareimage", + name="compatible", + field=models.JSONField(blank=True, default=list), + ), + migrations.AddField( + model_name="firmwareimage", + name="extraction_log", + field=models.TextField(blank=True), + ), + migrations.AddField( + model_name="firmwareimage", + name="extraction_status", + field=models.CharField( + choices=[ + ("unconfirmed", "Unconfirmed"), + ("in_progress", "In Progress"), + ("success", "Success"), + ("failed", "Failed"), + ("manually_confirmed", "Manually Confirmed"), + ("invalid", "Invalid"), + ], + db_index=True, + default="unconfirmed", + max_length=20, + verbose_name="extraction status", + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="failure_reason", + field=models.CharField( + blank=True, + choices=[ + ("unsupported_format", "Unsupported format"), + ("out_of_memory", "Out of memory"), + ("invalid_file", "Invalid file"), + ("timeout", "Extraction timed out"), + ], + default="", + max_length=20, + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="fw_version", + field=models.CharField( + blank=True, max_length=50, verbose_name="firmware version" + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="source", + field=models.CharField(blank=True, max_length=20), + ), + migrations.AddField( + model_name="firmwareimage", + name="target", + field=models.CharField(blank=True, max_length=100), + ), + ] diff --git a/openwisp_firmware_upgrader/migrations/0019_backfill_extraction_status.py b/openwisp_firmware_upgrader/migrations/0019_backfill_extraction_status.py new file mode 100644 index 000000000..e1a4ba8e3 --- /dev/null +++ b/openwisp_firmware_upgrader/migrations/0019_backfill_extraction_status.py @@ -0,0 +1,30 @@ +from django.db import migrations + + +def backfill_firmware_image_status(apps, schema_editor): + FirmwareImage = apps.get_model("firmware_upgrader", "FirmwareImage") + FirmwareImage.objects.filter(extraction_status="unconfirmed").update( + extraction_status="manually_confirmed", + source="manual", + ) + + +def backfill_build_status(apps, schema_editor): + Build = apps.get_model("firmware_upgrader", "Build") + Build.objects.filter(status="analyzing").update(status="manually_confirmed") + + +class Migration(migrations.Migration): + dependencies = [ + ("firmware_upgrader", "0018_build_status_firmwareimage_board_and_more"), + ] + operations = [ + migrations.RunPython( + backfill_firmware_image_status, + reverse_code=migrations.RunPython.noop, + ), + migrations.RunPython( + backfill_build_status, + reverse_code=migrations.RunPython.noop, + ), + ] diff --git a/openwisp_firmware_upgrader/migrations/0020_alter_firmwareimage_board_and_more.py b/openwisp_firmware_upgrader/migrations/0020_alter_firmwareimage_board_and_more.py new file mode 100644 index 000000000..87056a3e5 --- /dev/null +++ b/openwisp_firmware_upgrader/migrations/0020_alter_firmwareimage_board_and_more.py @@ -0,0 +1,61 @@ +# Generated by Django 5.2.14 on 2026-06-15 20:36 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("firmware_upgrader", "0019_backfill_extraction_status"), + ] + + operations = [ + migrations.AlterField( + model_name="firmwareimage", + name="board", + field=models.CharField(blank=True, max_length=200, verbose_name="board"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="compat_version", + field=models.CharField( + blank=True, max_length=10, verbose_name="compat version" + ), + ), + migrations.AlterField( + model_name="firmwareimage", + name="compatible", + field=models.JSONField(blank=True, default=list, verbose_name="compatible"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="extraction_log", + field=models.TextField(blank=True, verbose_name="extraction log"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="failure_reason", + field=models.CharField( + blank=True, + choices=[ + ("unsupported_format", "Unsupported format"), + ("out_of_memory", "Out of memory"), + ("invalid_file", "Invalid file"), + ("timeout", "Extraction timed out"), + ], + default="", + max_length=20, + verbose_name="failure reason", + ), + ), + migrations.AlterField( + model_name="firmwareimage", + name="source", + field=models.CharField(blank=True, max_length=20, verbose_name="source"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="target", + field=models.CharField(blank=True, max_length=100, verbose_name="target"), + ), + ] diff --git a/openwisp_firmware_upgrader/tasks.py b/openwisp_firmware_upgrader/tasks.py index 38ad83bdd..c1abf7dee 100644 --- a/openwisp_firmware_upgrader/tasks.py +++ b/openwisp_firmware_upgrader/tasks.py @@ -4,12 +4,17 @@ from celery import shared_task from celery.exceptions import SoftTimeLimitExceeded from django.core.exceptions import ObjectDoesNotExist +from django.urls import reverse +from django.utils.html import format_html from django.utils.translation import gettext_lazy as _ +from openwisp_notifications.signals import notify from openwisp_utils.tasks import OpenwispCeleryTask from . import settings as app_settings from .exceptions import RecoverableFailure +from .extractors.exceptions import DecompressionLimitExceeded, UnsupportedImageError +from .extractors.openwrt import OpenWrtMetadataExtractor from .swapper import load_model logger = logging.getLogger(__name__) @@ -97,3 +102,157 @@ def delete_firmware_files(files_to_delete): FirmwareImage = load_model("FirmwareImage") for file_path in files_to_delete: FirmwareImage._remove_file(file_path) + + +def _compat_blocks_pairing(compat_version): + try: + major, minor = (int(x) for x in str(compat_version).split(".")) + return (major, minor) > (1, 0) + except (ValueError, AttributeError, TypeError): + return False + + +@shared_task(bind=True, soft_time_limit=app_settings.TASK_TIMEOUT) +def extract_firmware_metadata(self, image_pk): + FirmwareImage = load_model("FirmwareImage") + + try: + image = FirmwareImage.objects.get(pk=image_pk) + except FirmwareImage.DoesNotExist: + logger.warning( + "extract_firmware_metadata: FirmwareImage pk=%s not found, skipping", + image_pk, + ) + return + + updated = FirmwareImage.objects.filter( + pk=image_pk, + extraction_status=FirmwareImage.STATUS_UNCONFIRMED, + ).update(extraction_status=FirmwareImage.STATUS_IN_PROGRESS) + if not updated: + return + log_lines = [f"[+] Analyzing: {image.file.name}"] + update = {} + + try: + extractor_class = getattr( + image.build.category.__class__, + "metadata_extractor_class", + OpenWrtMetadataExtractor, + ) + meta = extractor_class(image.file.path).extract() + log_lines.append("[+] extraction: success") + update = { + "extraction_status": FirmwareImage.STATUS_SUCCESS, + "extraction_log": "\n".join(log_lines), + "board": meta.get("model", ""), + "compatible": meta.get("compatible", []), + "target": meta.get("target", ""), + "fw_version": meta.get("version", ""), + "compat_version": meta.get("compat_version", ""), + "source": meta.get("source", "fwtool"), + } + + except SoftTimeLimitExceeded: + log_lines.append(f"[!] Task timed out after {app_settings.TASK_TIMEOUT}s.") + update = { + "extraction_status": FirmwareImage.STATUS_FAILED, + "failure_reason": FirmwareImage.FAILURE_TIMEOUT, + "extraction_log": "\n".join(log_lines), + } + logger.warning( + "extract_firmware_metadata: soft time limit exceeded for pk=%s", + image_pk, + ) + + except DecompressionLimitExceeded as exc: + log_lines.append(f"[!] {exc}") + update = { + "extraction_status": FirmwareImage.STATUS_FAILED, + "failure_reason": FirmwareImage.FAILURE_OOM, + "extraction_log": "\n".join(log_lines), + } + logger.warning( + "extract_firmware_metadata: decompression limit exceeded for pk=%s - %s", + image_pk, + exc, + ) + + except UnsupportedImageError as exc: + log_lines.append(f"[-] fwtool: {exc}") + log_lines.append("[!] Extraction failed. Manual input required.") + update = { + "extraction_status": FirmwareImage.STATUS_FAILED, + "failure_reason": FirmwareImage.FAILURE_UNSUPPORTED, + "extraction_log": "\n".join(log_lines), + } + logger.warning( + "extract_firmware_metadata: unsupported image pk=%s - %s", + image_pk, + exc, + ) + + except Exception: + log_lines.append("[!] Unexpected error during extraction.") + update = { + "extraction_status": FirmwareImage.STATUS_INVALID, + "failure_reason": FirmwareImage.FAILURE_INVALID, + "extraction_log": "\n".join(log_lines), + } + logger.exception( + "extract_firmware_metadata: unhandled exception for pk=%s", + image_pk, + ) + + FirmwareImage.objects.filter(pk=image_pk).update(**update) + + if update.get("extraction_status") not in ( + FirmwareImage.STATUS_SUCCESS, + FirmwareImage.STATUS_IN_PROGRESS, + ): + try: + image = FirmwareImage.objects.select_related( + "build", "build__category" + ).get(pk=image_pk) + build_opts = image.build._meta + admin_url = reverse( + f"admin:{build_opts.app_label}_{build_opts.model_name}_change", + args=[str(image.build_id)], + ) + notify.send( + sender=image, + type="generic_message", + level="error", + url=admin_url, + target=image.build, + message=format_html( + _( + 'Metadata extraction failed for {image}: ' + "{reason}. You can manually enter metadata or re-upload the image." + ), + url=admin_url, + image=image, + reason=update.get("failure_reason", "unknown error"), + ), + ) + except Exception: + logger.exception("Failed to send extraction failure notification") + + try: + fresh = FirmwareImage.objects.select_related("build").get(pk=image_pk) + fresh.build._update_extraction_status() + except Exception: + logger.exception( + "Failed to update build extraction status for image %s", image_pk + ) + + if update.get("extraction_status") == FirmwareImage.STATUS_SUCCESS: + compat = update.get("compat_version", "") + if _compat_blocks_pairing(compat): + logger.info( + "Auto-pairing skipped for image %s: compat_version %s > 1.0", + image_pk, + compat, + ) + else: + create_all_device_firmwares.delay(str(image_pk)) diff --git a/openwisp_firmware_upgrader/tests/base.py b/openwisp_firmware_upgrader/tests/base.py index 79614f3b2..fe32466bb 100644 --- a/openwisp_firmware_upgrader/tests/base.py +++ b/openwisp_firmware_upgrader/tests/base.py @@ -77,6 +77,8 @@ def _create_build(self, **kwargs): def _create_firmware_image(self, **kwargs): opts = dict(type=self.TPLINK_4300_IMAGE) opts.update(kwargs) + if "extraction_status" not in opts: + opts["extraction_status"] = FirmwareImage.STATUS_SUCCESS category_opts = {} if "organization" in opts: category_opts["organization"] = opts.pop("organization") diff --git a/openwisp_firmware_upgrader/tests/test_admin.py b/openwisp_firmware_upgrader/tests/test_admin.py index 344218a01..50fd7db6e 100644 --- a/openwisp_firmware_upgrader/tests/test_admin.py +++ b/openwisp_firmware_upgrader/tests/test_admin.py @@ -21,6 +21,7 @@ DeviceFirmwareForm, DeviceFirmwareInline, DeviceUpgradeOperationInline, + FirmwareImageAdmin, FirmwareImageInline, admin, ) @@ -213,6 +214,120 @@ def test_firmware_image_has_change_permission(self): self.assertIs(inline.has_change_permission(request), True) self.assertIs(inline.has_change_permission(request, obj=env["image1a"]), False) + def test_firmware_image_save_model_clears_compat_version_on_file_change(self): + fw = self._create_firmware_image() + FirmwareImage.objects.filter(pk=fw.pk).update( + board="TP-Link WDR4300", + compat_version="21.09", + extraction_status=FirmwareImage.STATUS_SUCCESS, + ) + fw.refresh_from_db() + request = MockRequest() + request.user = User.objects.first() + form = mock.MagicMock() + form.changed_data = ["file"] + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + with mock.patch("django.db.transaction.on_commit"): + fw_admin.save_model(request, fw, form, change=True) + fw.refresh_from_db() + self.assertEqual(fw.board, "") + self.assertEqual(fw.compat_version, "") + + def test_firmware_image_save_model_dtb_no_flip_without_changed_fields(self): + fw = self._create_firmware_image() + FirmwareImage.objects.filter(pk=fw.pk).update( + source="dtb", + board="Orange Pi Zero", + extraction_status=FirmwareImage.STATUS_SUCCESS, + ) + fw.refresh_from_db() + request = MockRequest() + request.user = User.objects.first() + form = mock.MagicMock() + form.changed_data = ["board"] + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + with mock.patch("django.db.transaction.on_commit"): + fw_admin.save_model(request, fw, form, change=True) + fw.refresh_from_db() + self.assertEqual(fw.extraction_status, FirmwareImage.STATUS_SUCCESS) + + def test_re_extract_metadata_action(self): + self._login() + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_FAILED, + failure_reason=FirmwareImage.FAILURE_UNSUPPORTED, + extraction_log="log output", + board="TP-Link WDR4300", + compatible=["tplink,tl-wdr4300-v1"], + target="ath79/generic", + fw_version="23.05.5", + compat_version="1.0", + source="fwtool", + ) + Build.objects.filter(pk=image.build_id).update(status=Build.BUILD_STATUS_FAILED) + url = reverse(f"admin:{self.app_label}_firmwareimage_changelist") + with mock.patch( + "openwisp_firmware_upgrader.tasks.extract_firmware_metadata.delay" + ) as mocked_delay: + with self.captureOnCommitCallbacks(execute=True): + r = self.client.post( + url, + { + "action": "re_extract_metadata", + ACTION_CHECKBOX_NAME: (str(image.pk),), + }, + follow=True, + ) + self.assertEqual(r.status_code, 200) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_UNCONFIRMED) + self.assertEqual(image.extraction_log, "") + self.assertEqual(image.failure_reason, "") + self.assertEqual(image.board, "") + self.assertEqual(image.compatible, []) + self.assertEqual(image.target, "") + self.assertEqual(image.fw_version, "") + self.assertEqual(image.compat_version, "") + self.assertEqual(image.source, "") + image.build.refresh_from_db() + self.assertEqual(image.build.status, Build.BUILD_STATUS_ANALYZING) + mocked_delay.assert_called_once_with(image.pk) + + def test_re_extract_metadata_action_multiple(self): + self._login() + build = self._create_build() + image1 = self._create_firmware_image(build=build, type=self.TPLINK_4300_IMAGE) + image2 = self._create_firmware_image( + build=build, type=self.TPLINK_4300_IL_IMAGE + ) + FirmwareImage.objects.filter(pk__in=[image1.pk, image2.pk]).update( + extraction_status=FirmwareImage.STATUS_FAILED, + ) + Build.objects.filter(pk=build.pk).update(status=Build.BUILD_STATUS_FAILED) + url = reverse(f"admin:{self.app_label}_firmwareimage_changelist") + with mock.patch( + "openwisp_firmware_upgrader.tasks.extract_firmware_metadata.delay" + ) as mocked_delay: + with self.captureOnCommitCallbacks(execute=True): + self.client.post( + url, + { + "action": "re_extract_metadata", + ACTION_CHECKBOX_NAME: (str(image1.pk), str(image2.pk)), + }, + follow=True, + ) + self.assertEqual(mocked_delay.call_count, 2) + called_pks = {call.args[0] for call in mocked_delay.call_args_list} + self.assertEqual(called_pks, {image1.pk, image2.pk}) + image1.refresh_from_db() + image2.refresh_from_db() + self.assertEqual(image1.extraction_status, FirmwareImage.STATUS_UNCONFIRMED) + self.assertEqual(image2.extraction_status, FirmwareImage.STATUS_UNCONFIRMED) + build.refresh_from_db() + self.assertEqual(build.status, Build.BUILD_STATUS_ANALYZING) + def test_device_firmware_inline_has_add_permission(self): device_fw = self._create_device_firmware() device = device_fw.device @@ -548,8 +663,7 @@ def test_deactivated_firmware_image_inline(self): # is displayed as readonly in the admin interface. self.assertContains( response, - '
Test Category v0.1:' - " TP-Link WDR4300 v1 (OpenWrt 19.07 and later)
", + "Test Category v0.1: TP-Link WDR4300 v1 (OpenWrt 19.07 and later)", ) self.assertNotContains( response, @@ -1069,6 +1183,144 @@ def test_device_upgrade_operation_inline_delete_mixed_statuses(self): self.assertIn("disabled", in_progress_delete_input) self.assertLess(failed_index, in_progress_index) + def test_firmware_image_readonly_fields_in_progress(self): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_IN_PROGRESS + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + readonly = fw_admin.get_readonly_fields(request, obj=fw) + for field in ["board", "compatible", "target", "fw_version", "compat_version"]: + with self.subTest(field=field): + self.assertIn(field, readonly) + + def test_firmware_image_readonly_fields_success_fwtool(self): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_SUCCESS + fw.source = "fwtool" + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + readonly = fw_admin.get_readonly_fields(request, obj=fw) + for field in ["board", "compatible", "target", "fw_version", "compat_version"]: + with self.subTest(field=field): + self.assertIn(field, readonly) + + def test_firmware_image_readonly_fields_success_dtb(self): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_SUCCESS + fw.source = "dtb" + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + readonly = fw_admin.get_readonly_fields(request, obj=fw) + for field in ["board", "compatible", "compat_version"]: + with self.subTest(field=field): + self.assertIn(field, readonly) + for field in ["target", "fw_version"]: + with self.subTest(field=field): + self.assertNotIn(field, readonly) + + def test_firmware_image_readonly_fields_failed(self): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_FAILED + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + readonly = fw_admin.get_readonly_fields(request, obj=fw) + for field in ["board", "compatible", "target", "fw_version", "compat_version"]: + with self.subTest(field=field): + self.assertNotIn(field, readonly) + + def test_firmware_image_readonly_fields_manually_confirmed(self): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_MANUALLY_CONFIRMED + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + readonly = fw_admin.get_readonly_fields(request, obj=fw) + for field in ["board", "compatible", "target", "fw_version", "compat_version"]: + with self.subTest(field=field): + self.assertIn(field, readonly) + + def test_firmware_image_readonly_fields_invalid(self): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_INVALID + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + readonly = fw_admin.get_readonly_fields(request, obj=fw) + for field in ["board", "compatible", "target", "fw_version", "compat_version"]: + with self.subTest(field=field): + self.assertIn(field, readonly) + + @mock.patch("openwisp_firmware_upgrader.admin.extract_firmware_metadata") + def test_firmware_image_save_model_file_change_triggers_extraction(self, mock_task): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_SUCCESS + fw.board = "Old Board" + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + form = mock.MagicMock() + form.changed_data = ["file"] + with self.captureOnCommitCallbacks(execute=True): + fw_admin.save_model(request, fw, form, change=True) + fw.refresh_from_db() + self.assertEqual(fw.extraction_status, FirmwareImage.STATUS_UNCONFIRMED) + self.assertEqual(fw.board, "") + mock_task.delay.assert_called_once_with(fw.pk) + + @mock.patch("openwisp_firmware_upgrader.admin.extract_firmware_metadata") + def test_firmware_image_save_model_failed_to_manually_confirmed(self, mock_task): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_FAILED + fw.board = "Generic x86" + fw.target = "x86/64" + fw.fw_version = "23.05.5" + fw.save() + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + form = mock.MagicMock() + form.changed_data = ["board", "target"] + fw_admin.save_model(request, fw, form, change=True) + fw.refresh_from_db() + self.assertEqual(fw.extraction_status, FirmwareImage.STATUS_MANUALLY_CONFIRMED) + self.assertEqual(fw.source, "manual") + mock_task.delay.assert_not_called() + + @mock.patch("openwisp_firmware_upgrader.admin.extract_firmware_metadata") + def test_firmware_image_save_model_dtb_success_to_manually_confirmed( + self, mock_task + ): + fw = self._create_firmware_image() + fw.extraction_status = FirmwareImage.STATUS_SUCCESS + fw.source = "dtb" + fw.board = "Xunlong Orange Pi Zero" + fw.target = "" + fw.fw_version = "" + fw.save() + fw.target = "sunxi/cortexa7" + fw.fw_version = "23.05.5" + request = MockRequest() + request.user = User.objects.first() + fw_admin = FirmwareImageAdmin(FirmwareImage, admin.site) + form = mock.MagicMock() + form.changed_data = ["target", "fw_version"] + fw_admin.save_model(request, fw, form, change=True) + fw.refresh_from_db() + self.assertEqual(fw.extraction_status, FirmwareImage.STATUS_MANUALLY_CONFIRMED) + self.assertEqual(fw.source, "dtb") + mock_task.delay.assert_not_called() + class TestAdminTransaction( BaseTestAdmin, AdminActionPermTestMixin, TransactionTestCase diff --git a/openwisp_firmware_upgrader/tests/test_api.py b/openwisp_firmware_upgrader/tests/test_api.py index c0a8724c9..cdd9023a4 100644 --- a/openwisp_firmware_upgrader/tests/test_api.py +++ b/openwisp_firmware_upgrader/tests/test_api.py @@ -326,7 +326,7 @@ def test_api_batch_upgrade(self): self.assertEqual(BatchUpgradeOperation.objects.count(), 0) with self.subTest("Existing build"): url = reverse("upgrader:api_build_batch_upgrade", args=[build.pk]) - with self.assertNumQueries(10): + with self.assertNumQueries(11): r = self.client.post(url) self.assertEqual(BatchUpgradeOperation.objects.count(), 1) batch = BatchUpgradeOperation.objects.first() @@ -416,7 +416,7 @@ def test_api_shared_build_batch_upgrade(self): with self.subTest( "Test superuser can mass upgrade shared build with upgrade_all" ): - with self.assertNumQueries(8): + with self.assertNumQueries(9): response = self.client.post(path, {"upgrade_all": True}) self.assertEqual(response.status_code, 201) batch = BatchUpgradeOperation.objects.first() @@ -1159,7 +1159,7 @@ def test_firmware_create(self): "file": self._get_simpleuploadedfile(self.FAKE_IMAGE_PATH2), "type": self.TPLINK_4300_IMAGE, } - with self.assertNumQueries(9): + with self.assertNumQueries(10): r = self.client.post(url, data) self.assertEqual(r.status_code, 201) self.assertEqual(FirmwareImage.objects.count(), 1) diff --git a/openwisp_firmware_upgrader/tests/test_extractors.py b/openwisp_firmware_upgrader/tests/test_extractors.py index e462c528e..081ce5a4a 100644 --- a/openwisp_firmware_upgrader/tests/test_extractors.py +++ b/openwisp_firmware_upgrader/tests/test_extractors.py @@ -8,6 +8,8 @@ ExtractionError, UnsupportedImageError, ) +from ..extractors.openwrt import OpenWrtMetadataExtractor +from ..upgraders.openwrt import OpenWrt class ConcreteSuccessExtractor(BaseMetadataExtractor): @@ -85,3 +87,6 @@ def test_image_path_stored_as_string(self): extractor = ConcreteSuccessExtractor(Path("/fake/path.bin")) self.assertIsInstance(extractor.image_path, str) self.assertEqual(extractor.image_path, "/fake/path.bin") + + def test_metadata_extractor_class_is_openwrt_extractor(self): + self.assertIs(OpenWrt.metadata_extractor_class, OpenWrtMetadataExtractor) diff --git a/openwisp_firmware_upgrader/tests/test_models.py b/openwisp_firmware_upgrader/tests/test_models.py index c421c58a4..80f1acce9 100644 --- a/openwisp_firmware_upgrader/tests/test_models.py +++ b/openwisp_firmware_upgrader/tests/test_models.py @@ -14,7 +14,6 @@ from openwisp_utils.tests import capture_any_output from .. import settings as app_settings -from ..hardware import FIRMWARE_IMAGE_MAP, REVERSE_FIRMWARE_IMAGE_MAP from ..swapper import load_model from ..tasks import upgrade_firmware from .base import TestUpgraderMixin @@ -36,7 +35,7 @@ class TestModels(TestUpgraderMixin, TestCase): app_label = "openwisp_firmware_upgrader" os = "OpenWrt 19.07-SNAPSHOT r11061-6ffd4d8a4d" - image_type = REVERSE_FIRMWARE_IMAGE_MAP["YunCore XD3200"] + image_type = "ar71xx-generic-xd3200-squashfs-sysupgrade.bin" def test_category_str(self): c = Category(name="WiFi Hotspot") @@ -195,22 +194,6 @@ def test_device_fw_uninstalled_without_credentials_rejected(self): device_fw.full_clean() self.assertIn("related connection", str(ctx.exception)) - def test_invalid_board(self): - image = FIRMWARE_IMAGE_MAP[self.TPLINK_4300_IMAGE] - boards = image["boards"] - del image["boards"] - err = None - try: - self._create_firmware_image() - except ValidationError as e: - err = e - image["boards"] = boards - if err: - self.assertIn("type", err.message_dict) - self.assertIn("not find boards", str(err)) - else: - self.fail("ValidationError not raised") - def test_custom_image_type_present(self): t = FirmwareImage._meta.get_field("type") custom_images = app_settings.CUSTOM_OPENWRT_IMAGES @@ -584,6 +567,192 @@ def test_upgrade_operation_str(self): expected = f"{uo.device} ({timezone.localtime(uo.created).strftime('%Y-%m-%d %H:%M:%S')})" self.assertEqual(str(uo), expected) + def test_firmware_image_rejects_invalid_file_headers(self): + build = self._get_build() + invalid_headers = [ + (b"\xff\xd8\xff" + b"\x00" * 20, "JPEG"), + (b"%PDF" + b"\x00" * 20, "PDF"), + (b"\x89PNG\r\n\x1a\n" + b"\x00" * 20, "PNG"), + (b"PK\x03\x04" + b"\x00" * 20, "ZIP"), + (b"\x7fELF" + b"\x00" * 20, "ELF"), + ] + for content, label in invalid_headers: + with self.subTest(file_type=label): + fw = FirmwareImage( + build=build, + type=self.TPLINK_4300_IMAGE, + file=SimpleUploadedFile( + name=f"openwrt-{self.TPLINK_4300_IMAGE}", + content=content, + content_type="application/octet-stream", + ), + ) + try: + fw.full_clean() + except ValidationError as e: + self.assertIn("file", e.message_dict) + else: + self.fail(f"ValidationError not raised for {label} file") + + def test_firmware_image_rejects_rootfs_image(self): + build = self._get_build() + fw = FirmwareImage( + build=build, + type=self.TPLINK_4300_IMAGE, + file=SimpleUploadedFile( + name="ath79-generic-tplink_tl-wdr4300-v1-squashfs-rootfs.img", + content=b"\x00" * 100, + content_type="application/octet-stream", + ), + ) + try: + fw.full_clean() + except ValidationError as e: + self.assertIn("file", e.message_dict) + else: + self.fail("ValidationError not raised for rootfs image") + + def test_batch_upgrade_blocked_with_unconfirmed_images(self): + env = self._create_upgrade_env() + build = env["build2"] + FirmwareImage.objects.filter(build=build).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + with self.assertRaises(ValidationError) as ctx: + build.batch_upgrade(firmwareless=False) + self.assertIn("confirmed metadata", str(ctx.exception)) + + def test_batch_upgrade_allowed_when_all_images_confirmed(self): + env = self._create_upgrade_env() + build = env["build2"] + build.firmwareimage_set.update(extraction_status=FirmwareImage.STATUS_SUCCESS) + batch = build.batch_upgrade(firmwareless=False) + self.assertIsNotNone(batch) + + def test_update_extraction_status_all_success(self): + build = self._create_build() + build.status = Build.BUILD_STATUS_ANALYZING + build.save() + image = self._create_firmware_image(build=build) + image.extraction_status = FirmwareImage.STATUS_SUCCESS + image.save() + build._update_extraction_status() + build.refresh_from_db() + self.assertEqual(build.status, Build.BUILD_STATUS_SUCCESS) + + def test_update_extraction_status_with_failed(self): + build = self._create_build() + build.status = Build.BUILD_STATUS_ANALYZING + build.save() + image = self._create_firmware_image(build=build) + image.extraction_status = FirmwareImage.STATUS_FAILED + image.save() + build._update_extraction_status() + build.refresh_from_db() + self.assertEqual(build.status, Build.BUILD_STATUS_FAILED) + + def test_update_extraction_status_analyzing_takes_priority(self): + build = self._create_build() + build.status = Build.BUILD_STATUS_ANALYZING + build.save() + image1 = self._create_firmware_image(build=build) + image1.extraction_status = FirmwareImage.STATUS_FAILED + image1.save() + image2 = self._create_firmware_image( + build=build, type=self.TPLINK_4300_IL_IMAGE + ) + image2.extraction_status = FirmwareImage.STATUS_IN_PROGRESS + image2.save() + build._update_extraction_status() + build.refresh_from_db() + self.assertEqual(build.status, Build.BUILD_STATUS_ANALYZING) + + def test_visible_locked_blocks_field_change_on_success(self): + image = self._create_firmware_image() + image.extraction_status = FirmwareImage.STATUS_SUCCESS + image.board = "TP-Link WDR4300" + image.target = "ath79/generic" + image.source = "fwtool" + image.save() + image.board = "Changed Board" + with self.assertRaises(ValidationError) as ctx: + image._validate_locked() + self.assertIn("read-only", str(ctx.exception)) + + def test_validate_locked_allows_change_when_failed(self): + image = self._create_firmware_image() + image.extraction_status = FirmwareImage.STATUS_FAILED + image.board = "" + image.save() + image.board = "Manually entered" + image._validate_locked() + + def test_validate_locked_allows_filling_empty_dtb_fields(self): + image = self._create_firmware_image() + image.extraction_status = FirmwareImage.STATUS_SUCCESS + image.board = "Orange Pi Zero" + image.target = "" + image.source = "dtb" + image.save() + image.target = "sunxi/cortexa7" + image._validate_locked() + + def test_validate_locked_blocks_bypass_via_status_change(self): + image = self._create_firmware_image() + image.extraction_status = FirmwareImage.STATUS_SUCCESS + image.board = "TP-Link WDR4300" + image.target = "ath79/generic" + image.source = "fwtool" + image.save() + image.extraction_status = FirmwareImage.STATUS_FAILED + image.board = "Tampered board" + with self.assertRaises(ValidationError) as ctx: + image._validate_locked() + self.assertIn("read-only", str(ctx.exception)) + + @capture_any_output() + def test_device_firmware_clean_blocks_unconfirmed_image(self): + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + image.refresh_from_db() + device = self._create_device(organization=image.build.category.organization) + self._create_config(device=device) + device_fw = DeviceFirmware() + device_fw.image = image + device_fw.device = device + with self.assertRaises(ValidationError) as ctx: + device_fw.clean() + self.assertIn("image", ctx.exception.message_dict) + + def test_auto_create_device_firmwares_skip_unconfirmed(self): + image = self._create_firmware_image() + image.extraction_status = FirmwareImage.STATUS_UNCONFIRMED + image.save() + with mock.patch("django.db.transaction.on_commit") as mock_on_commit: + DeviceFirmware.auto_create_device_firmwares(instance=image, created=False) + mock_on_commit.assert_not_called() + + def test_auto_create_device_firmwares_triggers_on_confirmed(self): + image = self._create_firmware_image( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + image.extraction_status = FirmwareImage.STATUS_SUCCESS + with mock.patch("django.db.transaction.on_commit") as mock_on_commit: + DeviceFirmware.auto_create_device_firmwares(instance=image, created=False) + mock_on_commit.assert_called_once() + + def test_auto_create_device_firmwares_skips_already_confirmed_resave(self): + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_SUCCESS + ) + image = FirmwareImage.objects.get(pk=image.pk) + with mock.patch("django.db.transaction.on_commit") as mock_on_commit: + DeviceFirmware.auto_create_device_firmwares(instance=image, created=False) + mock_on_commit.assert_not_called() + class TestModelsTransaction(TestUpgraderMixin, TransactionTestCase): _mock_updrade = "openwisp_firmware_upgrader.upgraders.openwrt.OpenWrt.upgrade" diff --git a/openwisp_firmware_upgrader/tests/test_tasks.py b/openwisp_firmware_upgrader/tests/test_tasks.py index 4d98d7cd2..6b03ee9d1 100644 --- a/openwisp_firmware_upgrader/tests/test_tasks.py +++ b/openwisp_firmware_upgrader/tests/test_tasks.py @@ -1,3 +1,4 @@ +import uuid from unittest import mock from celery.exceptions import SoftTimeLimitExceeded @@ -6,12 +7,16 @@ from openwisp_utils.tests import capture_any_output from .. import tasks +from ..extractors.exceptions import DecompressionLimitExceeded, UnsupportedImageError from ..swapper import load_model from .base import TestUpgraderMixin BatchUpgradeOperation = load_model("BatchUpgradeOperation") +FirmwareImage = load_model("FirmwareImage") UpgradeOperation = load_model("UpgradeOperation") +_MOCK_EXTRACTOR = "openwisp_firmware_upgrader.tasks.OpenWrtMetadataExtractor" + class TestTasks(TestUpgraderMixin, TransactionTestCase): _mock_upgrade = "openwisp_firmware_upgrader.upgraders.openwrt.OpenWrt.upgrade" @@ -66,3 +71,194 @@ def test_batch_upgrade_operation_resilience(self, mocked_logger, *args): mocked_logger.assert_called_with( f"The BatchUpgradeOperation object with id {batch_id} has been deleted" ) + batch_id = BatchUpgradeOperation().id + tasks.batch_upgrade_operation.run(batch_id=batch_id, firmwareless=False) + mocked_logger.assert_called_with( + f"The BatchUpgradeOperation object with id {batch_id} has been deleted" + ) + + @mock.patch(_MOCK_EXTRACTOR) + @capture_any_output() + def test_extract_firmware_metadata_success(self, *args): + MockExtractor = args[0] + MockExtractor.return_value.extract.return_value = { + "model": "TP-Link WDR4300", + "compatible": ["tplink,tl-wdr4300-v1"], + "target": "ath79/generic", + "version": "23.05.5", + "compat_version": "1.0", + "source": "fwtool", + } + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_SUCCESS) + self.assertEqual(image.board, "TP-Link WDR4300") + self.assertEqual(image.target, "ath79/generic") + self.assertEqual(image.source, "fwtool") + self.assertIn("success", image.extraction_log) + self.assertEqual(image.fw_version, "23.05.5") + self.assertEqual(image.compat_version, "1.0") + self.assertEqual(image.compatible, ["tplink,tl-wdr4300-v1"]) + + @mock.patch(_MOCK_EXTRACTOR) + @capture_any_output() + def test_extract_firmware_metadata_dtb_fallback(self, *args): + MockExtractor = args[0] + MockExtractor.return_value.extract.return_value = { + "model": "Xunlong Orange Pi Zero", + "compatible": ["xunlong,orangepi-zero"], + "target": "", + "version": "", + "compat_version": "1.0", + "source": "dtb", + } + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_SUCCESS) + self.assertEqual(image.source, "dtb") + self.assertEqual(image.board, "Xunlong Orange Pi Zero") + self.assertEqual(image.target, "") + self.assertEqual(image.compatible, ["xunlong,orangepi-zero"]) + self.assertEqual(image.compat_version, "1.0") + + @mock.patch(_MOCK_EXTRACTOR) + @capture_any_output() + def test_extract_firmware_metadata_unsupported_error(self, *args): + MockExtractor = args[0] + MockExtractor.return_value.extract.side_effect = UnsupportedImageError( + "armsr image type not supported" + ) + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_FAILED) + self.assertEqual(image.failure_reason, FirmwareImage.FAILURE_UNSUPPORTED) + self.assertIn("Extraction failed", image.extraction_log) + + @mock.patch(_MOCK_EXTRACTOR) + @capture_any_output() + def test_extract_firmware_metadata_decompression_limit(self, *args): + MockExtractor = args[0] + MockExtractor.return_value.extract.side_effect = DecompressionLimitExceeded( + "exceeded max decompressed size" + ) + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_FAILED) + self.assertEqual(image.failure_reason, FirmwareImage.FAILURE_OOM) + + @mock.patch(_MOCK_EXTRACTOR) + @capture_any_output() + def test_extract_firmware_metadata_timeout(self, *args): + MockExtractor = args[0] + MockExtractor.return_value.extract.side_effect = SoftTimeLimitExceeded() + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_FAILED) + self.assertEqual(image.failure_reason, FirmwareImage.FAILURE_TIMEOUT) + + @mock.patch(_MOCK_EXTRACTOR) + @capture_any_output() + def test_extract_firmware_metadata_invalid_exception(self, *args): + MockExtractor = args[0] + MockExtractor.return_value.extract.side_effect = RuntimeError("unexpected") + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_INVALID) + self.assertEqual(image.failure_reason, FirmwareImage.FAILURE_INVALID) + + @mock.patch("logging.Logger.warning") + def test_extract_firmware_metadata_image_not_found(self, mock_warning): + fake_pk = str(uuid.uuid4()) + tasks.extract_firmware_metadata.run(fake_pk) + mock_warning.assert_called_once() + self.assertTrue(any(fake_pk in str(arg) for arg in mock_warning.call_args.args)) + + @mock.patch(_MOCK_EXTRACTOR) + def test_extract_firmware_metadata_skips_non_unconfirmed(self, MockExtractor): + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_IN_PROGRESS + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + MockExtractor.assert_not_called() + image.refresh_from_db() + self.assertEqual(image.extraction_status, FirmwareImage.STATUS_IN_PROGRESS) + + def test_compat_blocks_pairing_above_1_0(self): + self.assertTrue(tasks._compat_blocks_pairing("1.1")) + self.assertTrue(tasks._compat_blocks_pairing("2.0")) + + def test_compat_blocks_pairing_at_or_below_1_0(self): + self.assertFalse(tasks._compat_blocks_pairing("1.0")) + self.assertFalse(tasks._compat_blocks_pairing("0.9")) + + def test_compat_blocks_pairing_invalid_values(self): + self.assertFalse(tasks._compat_blocks_pairing("")) + self.assertFalse(tasks._compat_blocks_pairing(None)) + self.assertFalse(tasks._compat_blocks_pairing("bad")) + + @mock.patch(_MOCK_EXTRACTOR) + @mock.patch("openwisp_firmware_upgrader.tasks.create_all_device_firmwares") + @capture_any_output() + def test_extract_firmware_metadata_skips_pairing_for_high_compat( + self, mock_create_firmwares, MockExtractor + ): + MockExtractor.return_value.extract.return_value = { + "model": "Test Device", + "compatible": ["test,device"], + "target": "test/target", + "version": "23.05.5", + "compat_version": "2.0", + "source": "fwtool", + } + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + mock_create_firmwares.delay.assert_not_called() + + @mock.patch(_MOCK_EXTRACTOR) + @mock.patch("openwisp_firmware_upgrader.tasks.create_all_device_firmwares") + @capture_any_output() + def test_extract_firmware_metadata_triggers_pairing_for_low_compat( + self, mock_create_firmwares, MockExtractor + ): + MockExtractor.return_value.extract.return_value = { + "model": "Test Device", + "compatible": ["test,device"], + "target": "test/target", + "version": "23.05.5", + "compat_version": "1.0", + "source": "fwtool", + } + image = self._create_firmware_image() + FirmwareImage.objects.filter(pk=image.pk).update( + extraction_status=FirmwareImage.STATUS_UNCONFIRMED + ) + tasks.extract_firmware_metadata.run(str(image.pk)) + mock_create_firmwares.delay.assert_called_once_with(str(image.pk)) diff --git a/tests/openwisp2/sample_firmware_upgrader/migrations/0005_build_status_firmwareimage_board_and_more.py b/tests/openwisp2/sample_firmware_upgrader/migrations/0005_build_status_firmwareimage_board_and_more.py new file mode 100644 index 000000000..07d46c81b --- /dev/null +++ b/tests/openwisp2/sample_firmware_upgrader/migrations/0005_build_status_firmwareimage_board_and_more.py @@ -0,0 +1,100 @@ +# Generated by Django 5.2.13 on 2026-05-22 22:08 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("sample_firmware_upgrader", "0004_alter_firmwareimage_file"), + ] + + operations = [ + migrations.AddField( + model_name="build", + name="status", + field=models.CharField( + choices=[ + ("analyzing", "Analyzing"), + ("success", "Success"), + ("failed", "Failed"), + ("invalid", "Invalid"), + ("manually_confirmed", "Manually Confirmed"), + ], + db_index=True, + default="analyzing", + max_length=20, + verbose_name="extraction status", + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="board", + field=models.CharField(blank=True, max_length=200), + ), + migrations.AddField( + model_name="firmwareimage", + name="compat_version", + field=models.CharField(blank=True, max_length=10), + ), + migrations.AddField( + model_name="firmwareimage", + name="compatible", + field=models.JSONField(blank=True, default=list), + ), + migrations.AddField( + model_name="firmwareimage", + name="extraction_log", + field=models.TextField(blank=True), + ), + migrations.AddField( + model_name="firmwareimage", + name="extraction_status", + field=models.CharField( + choices=[ + ("unconfirmed", "Unconfirmed"), + ("in_progress", "In Progress"), + ("success", "Success"), + ("failed", "Failed"), + ("manually_confirmed", "Manually Confirmed"), + ("invalid", "Invalid"), + ], + db_index=True, + default="unconfirmed", + max_length=20, + verbose_name="extraction status", + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="failure_reason", + field=models.CharField( + blank=True, + choices=[ + ("unsupported_format", "Unsupported format"), + ("out_of_memory", "Out of memory"), + ("invalid_file", "Invalid file"), + ("timeout", "Extraction timed out"), + ], + default="", + max_length=20, + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="fw_version", + field=models.CharField( + blank=True, max_length=50, verbose_name="firmware version" + ), + ), + migrations.AddField( + model_name="firmwareimage", + name="source", + field=models.CharField(blank=True, max_length=20), + ), + migrations.AddField( + model_name="firmwareimage", + name="target", + field=models.CharField(blank=True, max_length=100), + ), + ] diff --git a/tests/openwisp2/sample_firmware_upgrader/migrations/0006_alter_firmwareimage_board_and_more.py b/tests/openwisp2/sample_firmware_upgrader/migrations/0006_alter_firmwareimage_board_and_more.py new file mode 100644 index 000000000..6b7557b0b --- /dev/null +++ b/tests/openwisp2/sample_firmware_upgrader/migrations/0006_alter_firmwareimage_board_and_more.py @@ -0,0 +1,61 @@ +# Generated by Django 5.2.15 on 2026-06-17 08:23 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("sample_firmware_upgrader", "0005_build_status_firmwareimage_board_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="firmwareimage", + name="board", + field=models.CharField(blank=True, max_length=200, verbose_name="board"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="compat_version", + field=models.CharField( + blank=True, max_length=10, verbose_name="compat version" + ), + ), + migrations.AlterField( + model_name="firmwareimage", + name="compatible", + field=models.JSONField(blank=True, default=list, verbose_name="compatible"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="extraction_log", + field=models.TextField(blank=True, verbose_name="extraction log"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="failure_reason", + field=models.CharField( + blank=True, + choices=[ + ("unsupported_format", "Unsupported format"), + ("out_of_memory", "Out of memory"), + ("invalid_file", "Invalid file"), + ("timeout", "Extraction timed out"), + ], + default="", + max_length=20, + verbose_name="failure reason", + ), + ), + migrations.AlterField( + model_name="firmwareimage", + name="source", + field=models.CharField(blank=True, max_length=20, verbose_name="source"), + ), + migrations.AlterField( + model_name="firmwareimage", + name="target", + field=models.CharField(blank=True, max_length=100, verbose_name="target"), + ), + ]