diff --git a/docs/developer/extending.rst b/docs/developer/extending.rst index ce8d74706..01cb53c04 100644 --- a/docs/developer/extending.rst +++ b/docs/developer/extending.rst @@ -342,6 +342,7 @@ Once you have created the models, add the following to your CONFIG_TEMPLATE_MODEL = "sample_config.Template" CONFIG_VPN_MODEL = "sample_config.Vpn" CONFIG_VPNCLIENT_MODEL = "sample_config.VpnClient" + CONFIG_DEVICECERTIFICATE_MODEL = "sample_config.DeviceCertificate" CONFIG_ORGANIZATIONCONFIGSETTINGS_MODEL = "sample_config.OrganizationConfigSettings" CONFIG_ORGANIZATIONLIMITS_MODEL = "sample_config.OrganizationLimits" CONFIG_WHOISINFO_MODEL = "sample_config.WHOISInfo" diff --git a/docs/index.rst b/docs/index.rst index ab621e8f8..68bd76150 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -37,6 +37,7 @@ the OpenWISP architecture. user/intro.rst user/device-config-status.rst user/templates.rst + user/certificate-templates.rst user/variables.rst user/device-groups.rst user/push-operations.rst diff --git a/docs/user/certificate-templates.rst b/docs/user/certificate-templates.rst new file mode 100644 index 000000000..c64ccc95b --- /dev/null +++ b/docs/user/certificate-templates.rst @@ -0,0 +1,223 @@ +X.509 Certificate Generator Templates +===================================== + +.. contents:: **Table of Contents**: + :depth: 3 + :local: + +Introduction +------------ + +A Certificate Template is a specific type of :doc:`Configuration Template +` that allows OpenWISP to centrally manage and +automatically provision X.509 client certificates for devices. + +Unlike VPN templates, which generate certificates as part of a larger +tunnel configuration (like OpenVPN or WireGuard), Certificate Templates +are standalone. They are ideal for use cases where devices need +cryptographic identities for external services, such as: + +- Mutual TLS (mTLS) authentication against internal APIs. +- Cryptographically signed device identities for 802.1x or captive + portals. +- Secure payload signing. + +.. _certificate_templates_setup: + +Setting Up a Certificate Template +--------------------------------- + +To create a Certificate Template, navigate to the Templates section in the +OpenWISP admin and set the **Type** to :guilabel:`Certificate` (``cert``). +This will reveal the certificate-specific configuration fields. + +.. image:: https://raw.githubusercontent.com/openwisp/openwisp-controller/docs/docs/1.4/certificate-templates/certificate-template.png + :target: https://raw.githubusercontent.com/openwisp/openwisp-controller/docs/docs/1.4/certificate-templates/certificate-template.png + :alt: Certificate Template admin form + +:guilabel:`Certificate Authority` (required) + The Certificate Authority that will sign the X.509 certificates + generated for each device using the template. The CA must belong to + the same organization as the template (or be shared). + +:guilabel:`Blueprint Certificate` (optional) + An existing, unassigned, non-revoked X.509 certificate that will be + used as a *blueprint*. The extensions, key length, digest, and other + subject fields of the blueprint are copied to each newly generated + certificate. The blueprint must be signed by the selected + :guilabel:`Certificate Authority` and must not already be bound to a + device. + +:guilabel:`Automatic certificate provisioning` (``auto_cert``) + When enabled (which is the default behavior), an X.509 certificate is + automatically created and signed by the template's CA the moment the + template is assigned to a device configuration. + +.. _certificate_templates_validation: + +Validation Rules +---------------- + +To ensure cryptographic integrity and prevent misconfigurations, +Certificate Templates enforce the following validation rules upon saving: + +- A :guilabel:`Certificate Authority` is strictly required. +- The :guilabel:`Blueprint Certificate` **must** be signed by the selected + :guilabel:`Certificate Authority`. Saving with a mismatched pair will + fail. +- The :guilabel:`Blueprint Certificate` must be *unassigned* (it cannot be + currently bound to any device). The blueprint dropdown in the admin + panel is pre-filtered to show only unassigned, non-revoked certificates. +- Both the :guilabel:`Certificate Authority` and the :guilabel:`Blueprint + Certificate` must belong to the same organization as the template, or be + marked as shared. + +.. _certificate_templates_lifecycle: + +Provisioning and Revocation Lifecycle +------------------------------------- + +Certificate Templates are tightly coupled with the device configuration +lifecycle to ensure certificates are only valid while the device is +actively authorized to use them. + +**When assigned to a device:** When a Certificate Template is added to a +device configuration, a ``DeviceCertificate`` relationship is established. +If ``auto_cert`` is enabled, an X.509 certificate is generated and signed +in the same database transaction: + +- The certificate's subject and extensions are copied from the + :guilabel:`Blueprint Certificate` (or the CA defaults). +- The certificate is augmented with two custom OpenWISP OIDs that + cryptographically identify the device (see + :ref:`certificate_templates_oid_extensions`). +- The certificate's :guilabel:`Common Name` is generated using the + :ref:`OPENWISP_CONTROLLER_COMMON_NAME_FORMAT + ` setting, suffixed with a + unique slug to prevent collisions. + +**When removed from a device:** When the Certificate Template is +unassigned from a device configuration, the ``DeviceCertificate`` +relationship is deleted. Crucially, the underlying X.509 certificate is +**automatically revoked** (provided it was created via +``auto_cert=True``). This ensures that compromised or decommissioned +devices immediately lose their cryptographic access. + +.. _certificate_templates_active_lock: + +Active Template Mutation Lock +----------------------------- + +To prevent breaking the cryptographic binding with devices that are +already using a template, certain destructive changes are blocked while +the template is assigned to *active* or *activating* device +configurations. + +You **cannot** change the following on an actively used template: + +- :guilabel:`Type` (only changing a ``cert`` template to a different type + is blocked) +- :guilabel:`Certificate Authority` +- :guilabel:`Blueprint Certificate` + +If you need to update these core parameters, you must first unassign the +Certificate Template from all affected device configurations (or +deactivate the devices), apply your template changes, and then reassign +them. + +.. _certificate_templates_oid_extensions: + +Custom Device Identification OIDs +--------------------------------- + +To allow external systems to uniquely and securely identify devices by +parsing their X.509 certificates, every automatically generated +certificate includes two custom ASN.1 Object Identifiers (OIDs): + +- ``1.3.6.1.4.1.65901.1``: Contains the MAC address of the device + (``ASN1:UTF8:string:``). +- ``1.3.6.1.4.1.65901.2``: Contains the UUID of the device + (``ASN1:UTF8:string:``). + +These OIDs are appended securely to the certificate in addition to any +extensions inherited from the :guilabel:`Blueprint Certificate`. + +.. _certificate_templates_context: + +Using Certificates in Configuration (Context Injection) +------------------------------------------------------- + +To deploy the certificate to the device, administrators must reference the +automatically generated variables within the template's JSON configuration +payload. + +To prevent variable collisions when multiple Certificate Templates are +assigned to a single device, OpenWISP namespaces the Jinja2 variables +using the template's 32-character hexadecimal UUID (the UUID with dashes +removed). + +The following variables are dynamically injected into the configuration +context: + +- ``{{ cert__pem }}``: The public certificate. +- ``{{ cert__key }}``: The private key. +- ``{{ cert__uuid }}``: The UUID of the generated + certificate. +- ``{{ cert__path }}``: The file system path where the + certificate file will be installed on the device. +- ``{{ cert__key_path }}``: The file system path where + the private key file will be installed on the device. + +**Workflow Example:** + +1. Create a Certificate Template and click **Save and continue editing** + to generate its UUID. +2. Note the UUID from the URL (e.g., + ``d7d20eb8-b3c8-420c-9103-401e7960cfb3``). +3. Strip the dashes to get the hex string + (``d7d20eb8b3c8420c9103401e7960cfb3``). +4. Add the following JSON payload to write the certificate to the device's + filesystem: + +.. code-block:: json + + { + "files": [ + { + "path": "/etc/ssl/certs/device-cert.pem", + "mode": "0600", + "contents": "{{ cert_d7d20eb8b3c8420c9103401e7960cfb3_pem }}" + }, + { + "path": "/etc/ssl/private/device-key.pem", + "mode": "0600", + "contents": "{{ cert_d7d20eb8b3c8420c9103401e7960cfb3_key }}" + } + ] + } + +When this template is assigned to a device, OpenWISP will automatically +replace the variables with the exact, unique X.509 certificate and private +key generated for that specific device. + +.. _certificate_templates_api: + +Certificate Templates via the REST API +-------------------------------------- + +The Certificate Template architecture is fully supported by the +:doc:`OpenWISP REST API `: + +- The ``type`` field accepts the ``cert`` enumeration. +- ``ca`` and ``blueprint_cert`` are writable fields on the + ``/api/v1/controller/template/`` endpoint. +- The ``blueprint_cert`` field is strictly filtered. Attempting to pass + the UUID of an already-assigned or revoked certificate will return a + ``400 Bad Request``. +- The Active Mutation Lock is enforced at the serializer level: attempting + to patch the ``type``, ``ca``, or ``blueprint_cert`` of an actively + deployed template will return a ``400 Bad Request``. + +Additionally, you can trigger the automated creation and revocation +lifecycle by patching the ``config.templates`` array on the :ref:`Device +endpoint `. diff --git a/docs/user/intro.rst b/docs/user/intro.rst index 1c4b123ae..4eb2a0d7b 100644 --- a/docs/user/intro.rst +++ b/docs/user/intro.rst @@ -35,6 +35,8 @@ following features: - **VPN management**: automatically provision VPN tunnel configurations, including cryptographic keys and IP addresses, e.g.: :doc:`OpenVPN `, :doc:`WireGuard ` +- :doc:`Certificate Templates `: automatically + generate and manage X.509 client certificates for devices - :doc:`whois`: display information about the public IP address used by devices to communicate with OpenWISP - :doc:`import-export` diff --git a/docs/user/rest-api.rst b/docs/user/rest-api.rst index c9702920d..9c2131a78 100644 --- a/docs/user/rest-api.rst +++ b/docs/user/rest-api.rst @@ -1112,7 +1112,7 @@ You can filter a list of templates based on their backend using the GET /api/v1/controller/template/?backend={backend} You can filter a list of templates based on their type using the ``type`` -(e.g. vpn or generic). +(e.g. vpn, cert or generic). .. code-block:: text diff --git a/docs/user/settings.rst b/docs/user/settings.rst index aa59425ef..36dd87e59 100644 --- a/docs/user/settings.rst +++ b/docs/user/settings.rst @@ -308,9 +308,9 @@ levels of OpenWISP, see `netjsonconfig context: configuration variables The default value of the ``auto_cert`` field for new ``Template`` objects. The ``auto_cert`` field is valid only for templates which have ``type`` -set to ``VPN`` and indicates whether configuration regarding the VPN -tunnel is provisioned automatically to each device using the template, -e.g.: +set to ``VPN`` or ``cert`` and indicates whether configuration regarding +the VPN tunnel (or the x509 certificate) is provisioned automatically to +each device using the template, e.g.: - when using OpenVPN, new `x509 `_ certificates will be generated automatically using the same CA assigned @@ -330,6 +330,24 @@ The objects that are automatically created will also be removed when they are not needed anymore (e.g.: when the VPN template is removed from a configuration object). +``OPENWISP_CONTROLLER_REGENERATE_CERTS_ON_HARDWARE_CHANGE`` +----------------------------------------------------------- + +============ ======== +**type**: ``bool`` +**default**: ``True`` +============ ======== + +When a device's name or MAC address changes, OpenWISP automatically +revokes the existing X.509 client certificates and provisions new ones +with the updated identity attributes. Set this to ``False`` to disable +this automatic regeneration. + +.. note:: + + The regeneration only applies to certificates created via Certificate + Templates with ``auto_cert`` enabled. + ``OPENWISP_CONTROLLER_CERT_PATH`` --------------------------------- @@ -350,13 +368,13 @@ default). **default**: ``{mac_address}-{name}`` ============ ======================== -Defines the format of the ``common_name`` attribute of VPN client -certificates that are automatically created when using VPN templates which -have ``auto_cert`` set to ``True``. A unique slug generated using -`shortuuid `_ is appended to -the common name to introduce uniqueness. Therefore, resulting common names -will have ``{OPENWISP_CONTROLLER_COMMON_NAME_FORMAT}-{unique-slug}`` -format. +Defines the format of the ``common_name`` attribute of X.509 client +certificates that are automatically created when using VPN or Certificate +Templates which have ``auto_cert`` set to ``True``. A unique slug +generated using `shortuuid `_ +is appended to the common name to introduce uniqueness. Therefore, +resulting common names will have +``{OPENWISP_CONTROLLER_COMMON_NAME_FORMAT}-{unique-slug}`` format. .. note:: diff --git a/docs/user/templates.rst b/docs/user/templates.rst index a58530b1b..f46796ed1 100644 --- a/docs/user/templates.rst +++ b/docs/user/templates.rst @@ -209,3 +209,21 @@ engine: netjsonconfig. For more advanced technical information about templates, consult the netjsonconfig documentation: `Basic Concepts, Template `_. + +.. _certificate_templates: + +Certificate Templates +--------------------- + +A Certificate Template is a :doc:`Template ` +whose **Type** is set to :guilabel:`Certificate` (``cert``). See +:doc:`/controller/user/certificate-templates` for detailed documentation. + +It allows declaring the *Certificate Authority* and an optional *Blueprint +Certificate* that will be used to issue an X.509 certificate for each +device the template is assigned to, without needing a VPN backend. + +Certificate Templates are useful for any use case in which a fleet of +devices needs an X.509 client certificate managed centrally by OpenWISP, +for example: mutual TLS authentication against an internal service, signed +device identities for captive portals, etc. diff --git a/openwisp_controller/config/admin.py b/openwisp_controller/config/admin.py index 2140b0336..3d8d5f4a5 100644 --- a/openwisp_controller/config/admin.py +++ b/openwisp_controller/config/admin.py @@ -117,7 +117,11 @@ class Media: css = {"all": (f"{prefix}css/admin.css",)} js = list(CopyableFieldsAdmin.Media.js) + [ f"{prefix}js/{file_}" - for file_ in ("preview.js", "unsaved_changes.js", "switcher.js") + for file_ in ( + "preview.js", + "unsaved_changes.js", + "switcher.js", + ) ] def get_extra_context(self, pk=None): @@ -1060,6 +1064,8 @@ class TemplateAdmin(MultitenantAdminMixin, BaseConfigAdmin, SystemDefinedVariabl "organization", "type", "backend", + "ca", + "blueprint_cert", "default", "required", "created", @@ -1074,13 +1080,15 @@ class TemplateAdmin(MultitenantAdminMixin, BaseConfigAdmin, SystemDefinedVariabl "created", ] search_fields = ["name"] - multitenant_shared_relations = ("vpn",) + multitenant_shared_relations = ("vpn", "ca", "blueprint_cert") fields = [ "name", "organization", "type", "backend", "vpn", + "ca", + "blueprint_cert", "auto_cert", "tags", "default", @@ -1092,7 +1100,7 @@ class TemplateAdmin(MultitenantAdminMixin, BaseConfigAdmin, SystemDefinedVariabl "modified", ] readonly_fields = ["system_context"] - autocomplete_fields = ["vpn"] + autocomplete_fields = ["vpn", "ca", "blueprint_cert"] @admin.action(permissions=["add"]) def clone_selected_templates(self, request, queryset): diff --git a/openwisp_controller/config/api/serializers.py b/openwisp_controller/config/api/serializers.py index cf5d26c25..0dba8035f 100644 --- a/openwisp_controller/config/api/serializers.py +++ b/openwisp_controller/config/api/serializers.py @@ -17,6 +17,7 @@ DeviceGroup = load_model("config", "DeviceGroup") Config = load_model("config", "Config") Organization = load_model("openwisp_users", "Organization") +DeviceCertificate = load_model("config", "DeviceCertificate") class BaseMeta: @@ -38,6 +39,8 @@ class Meta(BaseMeta): "type", "backend", "vpn", + "ca", + "blueprint_cert", "tags", "default", "required", @@ -46,6 +49,16 @@ class Meta(BaseMeta): "created", "modified", ] + extra_kwargs = { + "blueprint_cert": { + "error_messages": { + "does_not_exist": _( + "This certificate does not exist or is already " + "assigned to a device configuration profile." + ) + } + } + } def validate_vpn(self, value): """ @@ -62,12 +75,102 @@ def validate_config(self, value): """ Display appropriate field name. """ - if self.initial_data.get("type") == "generic" and value == {}: + template_type = self.initial_data.get( + "type", getattr(self.instance, "type", None) + ) + if template_type == "generic" and value == {}: raise serializers.ValidationError( _("The configuration field cannot be empty.") ) return value + def validate(self, data): + """ + Explicitly validate certificate template fields and locks for the API. + """ + template_type = data.get("type", getattr(self.instance, "type", "generic")) + ca = data.get("ca", getattr(self.instance, "ca", None)) + blueprint_cert = data.get( + "blueprint_cert", getattr(self.instance, "blueprint_cert", None) + ) + # cert templates must have a CA + if template_type == "cert" and not ca: + raise serializers.ValidationError( + { + "ca": _( + "A Certificate Authority is required when " + "the template type is certificate." + ) + } + ) + # clear certificate-specific fields if the template is not a certificate + elif template_type != "cert": + data["ca"] = None + data["blueprint_cert"] = None + ca = None + blueprint_cert = None + # assert structural binding matches between CA and template blueprints + if template_type == "cert" and blueprint_cert and ca: + if blueprint_cert.ca_id != ca.id: + raise serializers.ValidationError( + { + "blueprint_cert": _( + "The selected certificate must match " + "the selected Certificate Authority." + ) + } + ) + # apply mutation protections over protected fields + if ( + self.instance + and self.instance.pk + and ("ca" in data or "blueprint_cert" in data or "type" in data) + ): + # only enforce locks if the template is assigned + # to active/activating devices + if ( + Config.objects.filter(templates=self.instance) + .exclude(status__in=["deactivating", "deactivated"]) + .exists() + ): + # block changing a certificate template to a generic template + if self.instance.type == "cert" and template_type != "cert": + raise serializers.ValidationError( + { + "type": _( + "This template is already assigned to active devices. " + "You cannot change the template type from certificate " + "on an active template." + ) + } + ) + # block altering the assigned Certificate Authority + if "ca" in data and data["ca"] != self.instance.ca: + raise serializers.ValidationError( + { + "ca": _( + "This template is already assigned to active devices. " + "You cannot change the CA or Blueprint Certificate " + "on an active template." + ) + } + ) + # block altering the assigned Blueprint Certificate + if ( + "blueprint_cert" in data + and data["blueprint_cert"] != self.instance.blueprint_cert + ): + raise serializers.ValidationError( + { + "blueprint_cert": _( + "This template is already assigned to active devices. " + "You cannot change the CA or Blueprint Certificate " + "on an active template." + ) + } + ) + return super().validate(data) + class VpnSerializer(BaseSerializer): config = serializers.JSONField(initial={}) @@ -218,6 +321,9 @@ def _update_config(self, device, config_data): vpn_list = config.templates.filter(type="vpn").values_list("vpn") if vpn_list: config.vpnclient_set.exclude(vpn__in=vpn_list).delete() + DeviceCertificate.objects.filter(config=config).exclude( + template_id__in=config_templates + ).delete() config.templates.set(config_templates, clear=True) config.save() except ValidationError as error: @@ -331,15 +437,19 @@ def update(self, instance, validated_data): # The value of the organization field is set here to # prevent access of the old value stored in the database # while performing above operations. - instance.config.device.organization = validated_data.get("organization") - instance.config.templates.clear() - Config.enforce_required_templates( - action="post_clear", - instance=instance.config, - sender=instance.config.templates, - pk_set=None, - raw_data=raw_data_for_signal_handlers, - ) + with transaction.atomic(): + instance.config.device.organization = validated_data.get( + "organization" + ) + DeviceCertificate.objects.filter(config=instance.config).delete() + instance.config.templates.clear() + Config.enforce_required_templates( + action="post_clear", + instance=instance.config, + sender=instance.config.templates, + pk_set=None, + raw_data=raw_data_for_signal_handlers, + ) return super().update(instance, validated_data) diff --git a/openwisp_controller/config/apps.py b/openwisp_controller/config/apps.py index 4c7e233f4..0de65ed40 100644 --- a/openwisp_controller/config/apps.py +++ b/openwisp_controller/config/apps.py @@ -69,6 +69,7 @@ def __setmodels__(self): self.org_limits = load_model("config", "OrganizationLimits") self.cert_model = load_model("django_x509", "Cert") self.org_model = load_model("openwisp_users", "Organization") + self.devicecert_model = load_model("config", "DeviceCertificate") def connect_signals(self): """ @@ -94,6 +95,11 @@ def connect_signals(self): sender=self.config_model.templates.through, dispatch_uid="config.manage_vpn_clients", ) + m2m_changed.connect( + self.config_model.manage_device_certs, + sender=self.config_model.templates.through, + dispatch_uid="config.manage_device_certs", + ) m2m_changed.connect( self.config_model.templates_changed, sender=self.config_model.templates.through, @@ -115,6 +121,11 @@ def connect_signals(self): sender=self.vpnclient_model, dispatch_uid="vpnclient.post_delete", ) + post_delete.connect( + self.devicecert_model.post_delete, + sender=self.devicecert_model, + dispatch_uid="devicecert.post_delete", + ) vpn_peers_changed.connect( self.vpn_model.update_vpn_server_configuration, sender=self.vpn_model, diff --git a/openwisp_controller/config/base/config.py b/openwisp_controller/config/base/config.py index 7b2412b8f..7a4b1d503 100644 --- a/openwisp_controller/config/base/config.py +++ b/openwisp_controller/config/base/config.py @@ -63,6 +63,13 @@ class AbstractConfig(ChecksumCacheMixin, BaseConfig): related_name="vpn_relations", blank=True, ) + device_certificates = models.ManyToManyField( + get_model_name("config", "Template"), + through=get_model_name("config", "DeviceCertificate"), + related_name="config_device_certificates", + blank=True, + verbose_name=_("device certificates"), + ) STATUS = Choices("modified", "applied", "error", "deactivating", "deactivated") status = StatusField( @@ -466,13 +473,19 @@ def enforce_required_templates( @classmethod def certificate_updated(cls, instance, created, **kwargs): + DeviceCertificate = load_model("config", "DeviceCertificate") if created or instance.revoked: return + configs_to_update = set() try: - config = instance.vpnclient.config + configs_to_update.add(instance.vpnclient.config) except ObjectDoesNotExist: - return - else: + pass + for dc in DeviceCertificate.objects.filter(cert=instance).select_related( + "config" + ): + configs_to_update.add(dc.config) + for config in configs_to_update: transaction.on_commit(config.update_status_if_checksum_changed) @classmethod @@ -486,6 +499,53 @@ def register_context_function(cls, func): if func not in cls._config_context_functions: cls._config_context_functions.append(func) + @classmethod + def manage_device_certs(cls, sender, instance, action, pk_set, **kwargs): + """ + Syncs DeviceCertificate objects when templates are added/removed + """ + # ignore signals during initial creation or for irrelevant M2M actions + if instance._state.adding or action not in [ + "post_add", + "post_remove", + "post_clear", + ]: + return + # handle full cleanup if the device configuration profile is being wiped + if action == "post_clear": + if instance.is_deactivating_or_deactivated(): + instance.devicecertificate_set.all().delete() + else: + # If this is a reorder, the subsequent 'add' will complete first, + # saving the certs. If it is a pure clear, the certs will be deleted. + transaction.on_commit( + lambda: instance.devicecertificate_set.exclude( + template_id__in=instance.templates.values_list("id", flat=True) + ).delete() + ) + return + # normalize templates across standard M2M sets vs Admin ModelForm querysets + if isinstance(pk_set, set): + template_model = cls.get_template_model() + templates = template_model.objects.filter(pk__in=list(pk_set)).order_by( + "created" + ) + else: + templates = pk_set + # deletes orphaned certificates that are no + # longer assigned in the templates list. + if len(pk_set) != templates.filter(required=True).count(): + instance.devicecertificate_set.exclude( + template_id__in=instance.templates.values_list("id", flat=True) + ).delete() + # allocate new DeviceCertificate associations + # for newly added certificate templates + if action == "post_add": + for template in templates.filter(type="cert"): + instance.devicecertificate_set.get_or_create( + template=template, defaults={"auto_cert": template.auto_cert} + ) + def get_default_templates(self): """ retrieves default templates of a Config object @@ -915,6 +975,33 @@ def get_vpn_context(self): context[vpn_context_keys["secret"]] = vpnclient.secret return context + def get_cert_context(self): + """ + Retrieves standalone certificates generated by Certificate Templates + and exposes them as UUID-namespaced variables for the configuration engine. + """ + cert_context = collections.OrderedDict() + for dc in self.devicecertificate_set.select_related( + "template", "cert" + ).order_by("created"): + if dc.cert: + template_hex = dc.template_id.hex + prefix = f"cert_{template_hex}" + cert_filename = "cert-{0}.pem".format(template_hex) + cert_path = "{0}/{1}".format(app_settings.CERT_PATH, cert_filename) + key_filename = "key-{0}.pem".format(template_hex) + key_path = "{0}/{1}".format(app_settings.CERT_PATH, key_filename) + cert_context.update( + { + f"{prefix}_path": cert_path, + f"{prefix}_pem": dc.cert.certificate, + f"{prefix}_key_path": key_path, + f"{prefix}_key": dc.cert.private_key, + f"{prefix}_uuid": str(dc.cert.id), + } + ) + return cert_context + def get_context(self, system=False): """ additional context passed to netjsonconfig @@ -942,6 +1029,7 @@ def get_context(self, system=False): context.update(self.device._get_group().get_context()) # Add predefined variables context.update(self.get_vpn_context()) + context.update(self.get_cert_context()) for func in self._config_context_functions: context.update(func(config=self)) if app_settings.HARDWARE_ID_ENABLED: diff --git a/openwisp_controller/config/base/device_certificate.py b/openwisp_controller/config/base/device_certificate.py new file mode 100644 index 000000000..6faeec721 --- /dev/null +++ b/openwisp_controller/config/base/device_certificate.py @@ -0,0 +1,175 @@ +import copy + +import shortuuid +from django.core.exceptions import ObjectDoesNotExist, ValidationError +from django.db import models, transaction +from django.utils.translation import gettext_lazy as _ +from swapper import get_model_name, load_model + +from openwisp_controller.config import settings as app_settings +from openwisp_utils.base import TimeStampedEditableModel + +MAC_ADDRESS_OID = "1.3.6.1.4.1.65901.1" +DEVICE_UUID_OID = "1.3.6.1.4.1.65901.2" + + +class AbstractDeviceCertificate(TimeStampedEditableModel): + config = models.ForeignKey( + get_model_name("config", "Config"), on_delete=models.CASCADE + ) + template = models.ForeignKey( + get_model_name("config", "Template"), on_delete=models.CASCADE + ) + cert = models.OneToOneField( + get_model_name("django_x509", "Cert"), + on_delete=models.CASCADE, + blank=True, + null=True, + ) + auto_cert = models.BooleanField(default=False) + + class Meta: + abstract = True + unique_together = ("config", "template") + verbose_name = _("Device certificate") + verbose_name_plural = _("Device certificates") + + def __str__(self): + cert_name = self.cert.name if self.cert else str(_("Pending Generation")) + return f"{self.config.device.name} - {cert_name}" + + def clean(self): + Template = load_model("config", "Template") + if ( + self.cert_id + and Template.objects.filter(blueprint_cert_id=self.cert_id).exists() + ): + raise ValidationError( + { + "cert": _( + "This certificate is currently used as a blueprint " + "by a template and cannot be directly assigned to a device." + ) + } + ) + super().clean() + + def save(self, *args, **kwargs): + """Performs automatic provisioning if ``auto_cert`` is True.""" + with transaction.atomic(): + if self.auto_cert and not self.cert: + self._auto_x509() + self.full_clean(validate_unique=False) + super().save(*args, **kwargs) + + def _auto_x509(self): + """ + Automatically creates an x509 certificate. + """ + if self.cert: + return + cn = self._get_common_name() + self._auto_create_cert(name=self.config.device.name, common_name=cn) + + def _get_common_name(self): + """ + Returns a unique common name for a new certificate, mirroring VPN client logic. + """ + d = self.config.device + end = 63 - len(d.mac_address) + truncated_name = d.name[:end] + unique_slug = shortuuid.ShortUUID().random(length=8) + cn_format = app_settings.COMMON_NAME_FORMAT + if cn_format == "{mac_address}-{name}" and truncated_name == d.mac_address: + cn_format = "{mac_address}" + format_dict = {**d.__dict__, "name": truncated_name} + common_name = cn_format.format(**format_dict)[:55] + common_name = f"{common_name}-{unique_slug}" + return common_name + + def _build_cert(self, name, common_name): + """Build (but do not save) a Cert instance from template + blueprint.""" + ca = self.template.ca + blueprint = self.template.blueprint_cert + cert_model = self.__class__.cert.field.related_model + + attrs = self._clone_blueprint_attrs(ca, blueprint) + extensions = self._build_extensions(blueprint) + cert = cert_model( + name=name, + ca=ca, + common_name=common_name, + extensions=extensions, + **attrs, + ) + return self._auto_create_cert_extra(cert) + + def _clone_blueprint_attrs(self, ca, blueprint): + """ + Extracts base X.509 attributes (such as key length, digest, and + location data) from the provided blueprint certificate. + """ + source = blueprint or ca + digest = str(source.digest) if not blueprint else source.digest + return dict( + key_length=source.key_length, + digest=digest, + country_code=source.country_code, + state=source.state, + city=source.city, + organization_name=source.organization_name, + email=source.email, + ) + + def _build_extensions(self, blueprint): + """Compiles the list of X.509 extensions for the new certificate.""" + if blueprint and blueprint.extensions: + extensions = copy.deepcopy(blueprint.extensions) + else: + extensions = [{"name": "nsCertType", "value": "client", "critical": False}] + extensions.extend(self._get_hardware_oid_extensions()) + return extensions + + def _get_hardware_oid_extensions(self): + device = self.config.device + return [ + { + "oid": MAC_ADDRESS_OID, + "value": f"ASN1:UTF8:string:{device.mac_address}", + "critical": False, + }, + { + "oid": DEVICE_UUID_OID, + "value": f"ASN1:UTF8:string:{device.id}", + "critical": False, + }, + ] + + def _auto_create_cert(self, name, common_name): + """ + Automatically creates and assigns a client x509 certificate + """ + cert = self._build_cert(name=name, common_name=common_name) + cert.full_clean() + cert.save() + self.cert = cert + return cert + + def _auto_create_cert_extra(self, cert): + """ + Sets the organization on the created client certificate. + """ + cert.organization = self.config.device.organization + return cert + + @classmethod + def post_delete(cls, instance, **kwargs): + """ + Receiver of ``post_delete`` signal. + Automatically revokes the certificate when the template is unassigned. + """ + try: + if instance.cert and instance.auto_cert: + instance.cert.revoke() + except ObjectDoesNotExist: + pass diff --git a/openwisp_controller/config/base/template.py b/openwisp_controller/config/base/template.py index 555366b7b..cd152ac08 100644 --- a/openwisp_controller/config/base/template.py +++ b/openwisp_controller/config/base/template.py @@ -22,7 +22,11 @@ logger = logging.getLogger(__name__) -TYPE_CHOICES = (("generic", _("Generic")), ("vpn", _("VPN-client"))) +TYPE_CHOICES = ( + ("generic", _("Generic")), + ("vpn", _("VPN-client")), + ("cert", _("Certificate")), +) def default_auto_cert(): @@ -33,6 +37,18 @@ def default_auto_cert(): return DEFAULT_AUTO_CERT +def get_unassigned_certs(): + Cert = load_model("django_x509", "Cert") + DeviceCertificate = load_model("config", "DeviceCertificate") + assigned_cert_ids = DeviceCertificate.objects.filter( + cert_id__isnull=False + ).values_list("cert_id", flat=True) + return { + "pk__in": Cert.objects.exclude(id__in=assigned_cert_ids), + "revoked": False, + } + + class AbstractTemplate(ShareableOrgMixinUniqueName, BaseConfig): """ Abstract model implementing a @@ -55,6 +71,30 @@ class AbstractTemplate(ShareableOrgMixinUniqueName, BaseConfig): null=True, on_delete=models.CASCADE, ) + ca = models.ForeignKey( + get_model_name("django_x509", "Ca"), + on_delete=models.CASCADE, + verbose_name=_("Certificate Authority"), + blank=True, + null=True, + help_text=_( + "The Certificate Authority that will sign certificates generated " + "by this template." + ), + ) + + blueprint_cert = models.ForeignKey( + get_model_name("django_x509", "Cert"), + on_delete=models.SET_NULL, + verbose_name=_("Blueprint Certificate"), + blank=True, + null=True, + limit_choices_to=get_unassigned_certs, + help_text=_( + "Optional: Select an unassigned certificate to copy extensions and " + "properties from." + ), + ) type = models.CharField( _("type"), max_length=16, @@ -90,7 +130,8 @@ class AbstractTemplate(ShareableOrgMixinUniqueName, BaseConfig): help_text=_( "whether tunnel specific configuration (cryptographic keys, ip addresses, " "etc) should be automatically generated and managed behind the scenes " - "for each configuration using this template, valid only for the VPN type" + "for each configuration using this template, valid only for the VPN and " + "certificate template types" ), ) default_values = JSONField( @@ -212,6 +253,98 @@ def _auto_add_to_existing_configs(self): f"config {config.pk}: {e}" ) + def _validate_cert_template_changes(self): + """ + Prevents changing cert-specific settings of a certificate template + if it is already assigned to active devices. + """ + if self._state.adding: + return + try: + current = self.__class__.objects.get(pk=self.pk) + except self.__class__.DoesNotExist: + return + changing_protected_fields = ( + current.ca_id != self.ca_id + or current.blueprint_cert_id != self.blueprint_cert_id + or (current.type == "cert" and self.type != "cert") + ) + if not changing_protected_fields: + return + + Config = load_model("config", "Config") + if not ( + Config.objects.filter(templates=self) + .exclude(status__in=["deactivating", "deactivated"]) + .exists() + ): + return + + message = _( + "This template is already assigned to active devices. " + "You cannot change the CA or Blueprint Certificate " + "on an active template." + ) + errors = {} + if current.ca_id != self.ca_id: + errors["ca"] = message + if current.blueprint_cert_id != self.blueprint_cert_id: + errors["blueprint_cert"] = message + if current.type == "cert" and self.type != "cert": + errors["type"] = _( + "This template is already assigned to active devices. " + "You cannot change the template type from certificate " + "on an active template." + ) + if errors: + raise ValidationError(errors) + + def _clean_cert_template(self): + """ + Validates requirements specific to templates of type 'cert'. + Clears cert-related fields if the type is not 'cert'. + """ + if self.type == "cert": + self._validate_org_relation("ca") + self._validate_org_relation("blueprint_cert") + if not self.ca: + raise ValidationError( + { + "ca": _( + "A Certificate Authority is required when the template " + "type is certificate." + ) + } + ) + if self.blueprint_cert and self.blueprint_cert.ca_id != self.ca_id: + raise ValidationError( + { + "blueprint_cert": _( + "The selected certificate must match the selected " + "Certificate Authority." + ) + } + ) + if self.blueprint_cert_id: + DeviceCertificate = load_model("config", "DeviceCertificate") + if DeviceCertificate.objects.filter( + cert_id=self.blueprint_cert_id + ).exists(): + raise ValidationError( + { + "blueprint_cert": _( + "This certificate is already assigned to a device. " + "Please select an unassigned certificate to " + "use as a blueprint." + ) + } + ) + if self.config is None: + self.config = {} + else: + self.ca = None + self.blueprint_cert = None + def clean(self, *args, **kwargs): """ * validates org relationship of VPN if present @@ -220,7 +353,11 @@ def clean(self, *args, **kwargs): * clears VPN specific fields if type is not VPN * automatically determines configuration if necessary * if flagged as required forces it also to be default + * prevents mutating cert-specific fields on active cert templates + * enforces CA and Blueprint requirements for cert templates """ + self._validate_cert_template_changes() + self._clean_cert_template() self._validate_org_relation("vpn") if not self.default_values: self.default_values = {} @@ -234,7 +371,8 @@ def clean(self, *args, **kwargs): ) elif self.type != "vpn": self.vpn = None - self.auto_cert = False + if self.type != "cert": + self.auto_cert = False if self.type == "vpn" and not self.config: self.config = self.vpn.auto_client( auto_cert=self.auto_cert, template_backend_class=self.backend_class @@ -242,7 +380,7 @@ def clean(self, *args, **kwargs): if self.required and not self.default: self.default = True super().clean(*args, **kwargs) - if not self.config: + if not self.config and self.type != "cert": raise ValidationError(_("The configuration field cannot be empty.")) def get_context(self, system=False): diff --git a/openwisp_controller/config/handlers.py b/openwisp_controller/config/handlers.py index d838d1ad3..faee94bec 100644 --- a/openwisp_controller/config/handlers.py +++ b/openwisp_controller/config/handlers.py @@ -1,4 +1,5 @@ from django.db import transaction +from django.db.models.signals import post_save, pre_save from django.dispatch import receiver from django.utils.translation import gettext_lazy as _ from openwisp_notifications.signals import notify @@ -6,6 +7,7 @@ from openwisp_controller.config.controller.views import DeviceChecksumView +from . import settings as app_settings from . import tasks from .signals import config_status_changed, device_registered @@ -43,6 +45,70 @@ def device_registered_notification(sender, instance, is_new, **kwargs): ) +def _hardware_fields_changed(update_fields): + return update_fields is None or ( + "name" in update_fields or "mac_address" in update_fields + ) + + +def _hardware_field_was_saved(update_fields, field_name): + return update_fields is None or field_name in update_fields + + +@receiver(pre_save, sender=Device, dispatch_uid="capture_old_hardware_properties") +def capture_old_hardware_properties(sender, instance, **kwargs): + """ + Temporarily caches the old name and mac_address before saving + to detect hardware drift in post_save. + """ + if not instance.pk: + return + if not _hardware_fields_changed(kwargs.get("update_fields")): + return + try: + old_instance = sender.objects.only("name", "mac_address").get(pk=instance.pk) + instance._old_name = old_instance.name + instance._old_mac = old_instance.mac_address + except sender.DoesNotExist: + pass + + +@receiver(post_save, sender=Device, dispatch_uid="detect_hardware_drift") +def detect_hardware_drift(sender, instance, created, **kwargs): + """ + Triggers certificate regeneration if hardware properties (name/mac) change. + """ + if created or not app_settings.REGENERATE_CERTS_ON_HARDWARE_CHANGE: + return + if not _hardware_fields_changed(kwargs.get("update_fields")): + return + update_fields = kwargs.get("update_fields") + name_changed = ( + _hardware_field_was_saved(update_fields, "name") + and getattr(instance, "_old_name", instance.name) != instance.name + ) + mac_changed = ( + _hardware_field_was_saved(update_fields, "mac_address") + and getattr(instance, "_old_mac", instance.mac_address) != instance.mac_address + ) + if name_changed or mac_changed: + DeviceCertificate = load_model("config", "DeviceCertificate") + expected_cert_ids = list( + DeviceCertificate.objects.filter( + config__device=instance, + auto_cert=True, + cert__revoked=False, + template__type="cert", + ).values_list("id", "cert_id") + ) + if expected_cert_ids: + transaction.on_commit( + lambda: tasks.regenerate_device_certificates_task.delay( + str(instance.id), expected_cert_ids + ) + ) + + def devicegroup_change_handler(instance, **kwargs): if type(instance) is list: # changes group templates for multiple devices diff --git a/openwisp_controller/config/migrations/0064_template_blueprint_cert_template_ca_and_more.py b/openwisp_controller/config/migrations/0064_template_blueprint_cert_template_ca_and_more.py new file mode 100644 index 000000000..083c21df8 --- /dev/null +++ b/openwisp_controller/config/migrations/0064_template_blueprint_cert_template_ca_and_more.py @@ -0,0 +1,152 @@ +# Generated by Django 5.2.14 on 2026-05-30 18:31 + +import uuid + +import django.db.models.deletion +import django.utils.timezone +import model_utils.fields +from django.conf import settings +from django.db import migrations, models + +import openwisp_controller.config.base.template + + +class Migration(migrations.Migration): + + dependencies = [ + ("config", "0063_replace_jsonfield_with_django_builtin"), + migrations.swappable_dependency(settings.DJANGO_X509_CA_MODEL), + migrations.swappable_dependency(settings.DJANGO_X509_CERT_MODEL), + ] + + operations = [ + migrations.AddField( + model_name="template", + name="blueprint_cert", + field=models.ForeignKey( + blank=True, + help_text="Optional: Select an unassigned certificate " + "to copy extensions and properties from.", + limit_choices_to=openwisp_controller.config.base.template.get_unassigned_certs, # noqa + null=True, + on_delete=django.db.models.deletion.SET_NULL, + to=settings.DJANGO_X509_CERT_MODEL, + verbose_name="Blueprint Certificate", + ), + ), + migrations.AddField( + model_name="template", + name="ca", + field=models.ForeignKey( + blank=True, + help_text="The Certificate Authority that will sign " + "certificates generated by this template.", + null=True, + on_delete=django.db.models.deletion.CASCADE, + to=settings.DJANGO_X509_CA_MODEL, + verbose_name="Certificate Authority", + ), + ), + migrations.AlterField( + model_name="template", + name="auto_cert", + field=models.BooleanField( + db_index=True, + default=openwisp_controller.config.base.template.default_auto_cert, + help_text="whether tunnel specific configuration " + "(cryptographic keys, ip addresses, etc) should be " + "automatically generated and managed behind the scenes for " + "each configuration using this template, valid only for the " + "VPN and certificate template types", + verbose_name="automatic tunnel provisioning", + ), + ), + migrations.AlterField( + model_name="template", + name="type", + field=models.CharField( + choices=[ + ("generic", "Generic"), + ("vpn", "VPN-client"), + ("cert", "Certificate"), + ], + db_index=True, + default="generic", + help_text="template type, determines which features are available", + max_length=16, + verbose_name="type", + ), + ), + migrations.CreateModel( + name="DeviceCertificate", + fields=[ + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + ), + ), + ( + "created", + model_utils.fields.AutoCreatedField( + default=django.utils.timezone.now, + editable=False, + verbose_name="created", + ), + ), + ( + "modified", + model_utils.fields.AutoLastModifiedField( + default=django.utils.timezone.now, + editable=False, + verbose_name="modified", + ), + ), + ("auto_cert", models.BooleanField(default=False)), + ( + "cert", + models.OneToOneField( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + to=settings.DJANGO_X509_CERT_MODEL, + ), + ), + ( + "config", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to=settings.CONFIG_CONFIG_MODEL, + ), + ), + ( + "template", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to=settings.CONFIG_TEMPLATE_MODEL, + ), + ), + ], + options={ + "verbose_name": "Device certificate", + "verbose_name_plural": "Device certificates", + "abstract": False, + "swappable": "CONFIG_DEVICECERTIFICATE_MODEL", + "unique_together": {("config", "template")}, + }, + ), + migrations.AddField( + model_name="config", + name="device_certificates", + field=models.ManyToManyField( + blank=True, + related_name="config_device_certificates", + through="config.DeviceCertificate", + to=settings.CONFIG_TEMPLATE_MODEL, + verbose_name="device certificates", + ), + ), + ] diff --git a/openwisp_controller/config/models.py b/openwisp_controller/config/models.py index e36280319..8fcae7b1c 100644 --- a/openwisp_controller/config/models.py +++ b/openwisp_controller/config/models.py @@ -2,6 +2,7 @@ from .base.config import AbstractConfig from .base.device import AbstractDevice +from .base.device_certificate import AbstractDeviceCertificate from .base.device_group import AbstractDeviceGroup from .base.multitenancy import ( AbstractOrganizationConfigSettings, @@ -93,6 +94,16 @@ class Meta(AbstractVpnClient.Meta): swappable = swapper.swappable_setting("config", "VpnClient") +class DeviceCertificate(AbstractDeviceCertificate): + """ + m2m through model + """ + + class Meta(AbstractDeviceCertificate.Meta): + abstract = False + swappable = swapper.swappable_setting("config", "DeviceCertificate") + + class OrganizationConfigSettings(AbstractOrganizationConfigSettings): """ Configuration management settings diff --git a/openwisp_controller/config/settings.py b/openwisp_controller/config/settings.py index 55ad86ce3..57b2bf219 100644 --- a/openwisp_controller/config/settings.py +++ b/openwisp_controller/config/settings.py @@ -34,6 +34,9 @@ CONTEXT = get_setting("CONTEXT", {}) assert isinstance(CONTEXT, dict), "OPENWISP_CONTROLLER_CONTEXT must be a dictionary" DEFAULT_AUTO_CERT = get_setting("DEFAULT_AUTO_CERT", True) +REGENERATE_CERTS_ON_HARDWARE_CHANGE = get_setting( + "REGENERATE_CERTS_ON_HARDWARE_CHANGE", True +) CERT_PATH = get_setting("CERT_PATH", "/etc/x509") COMMON_NAME_FORMAT = get_setting("COMMON_NAME_FORMAT", "{mac_address}-{name}") MANAGEMENT_IP_DEVICE_LIST = get_setting("MANAGEMENT_IP_DEVICE_LIST", True) diff --git a/openwisp_controller/config/static/config/js/switcher.js b/openwisp_controller/config/static/config/js/switcher.js index c46acaae3..7a409fc8f 100644 --- a/openwisp_controller/config/static/config/js/switcher.js +++ b/openwisp_controller/config/static/config/js/switcher.js @@ -1,14 +1,18 @@ "use strict"; django.jQuery(function ($) { var type_select = $("#id_type"), - vpn_specific = $(".field-vpn, .field-auto_cert"), + vpn_specific = $(".field-vpn"), + cert_specific = $(".field-ca, .field-blueprint_cert"), + auto_cert_field = $(".field-auto_cert"), + auto_cert_label = $("label[for='id_auto_cert']"), gettext = window.gettext || function (v) { return v; }, - toggle_vpn_specific = function (changed) { - if (type_select.val() == "vpn") { + toggle_specific_fields = function (changed) { + var val = type_select.val(); + if (val === "vpn") { vpn_specific.show(); if ( changed === true && @@ -35,9 +39,30 @@ django.jQuery(function ($) { $(".autovpn").hide(); } } + if (val === "cert") { + cert_specific.show(); + } else { + cert_specific.hide(); + } + if (val === "vpn" || val === "cert") { + auto_cert_field.show(); + } else { + auto_cert_field.hide(); + } + if (val === "vpn" || val === "cert") { + auto_cert_field.show(); + + if (val === "vpn") { + auto_cert_label.text(gettext("Automatic tunnel provisioning")); + } else if (val === "cert") { + auto_cert_label.text(gettext("Automatic certificate provisioning")); + } + } else { + auto_cert_field.hide(); + } }; type_select.on("change", function () { - toggle_vpn_specific(true); + toggle_specific_fields(true); }); - toggle_vpn_specific(); + toggle_specific_fields(); }); diff --git a/openwisp_controller/config/tasks.py b/openwisp_controller/config/tasks.py index 798e40f8c..9f73b713b 100644 --- a/openwisp_controller/config/tasks.py +++ b/openwisp_controller/config/tasks.py @@ -5,10 +5,14 @@ from celery.exceptions import SoftTimeLimitExceeded from django.conf import settings from django.core.exceptions import ObjectDoesNotExist +from django.db import transaction +from django.utils.translation import gettext_lazy as _ +from openwisp_notifications.signals import notify from swapper import load_model from openwisp_utils.tasks import OpenwispCeleryTask +from . import settings as app_settings from .utils import handle_error_notification, handle_recovery_notification logger = logging.getLogger(__name__) @@ -217,3 +221,74 @@ def invalidate_controller_views_cache(organization_id): Vpn.objects.filter(organization_id=organization_id).only("id").iterator() ): GetVpnView.invalidate_get_vpn_cache(vpn) + + +@shared_task(soft_time_limit=1200) +def regenerate_device_certificates_task(device_id, expected_cert_ids=None): + """ + Revokes stale certificates and mints fresh ones when hardware drift occurs. + """ + if not app_settings.REGENERATE_CERTS_ON_HARDWARE_CHANGE: + return + Device = load_model("config", "Device") + DeviceCertificate = load_model("config", "DeviceCertificate") + try: + device = Device.objects.get(id=device_id) + except Device.DoesNotExist: + return + + configs_to_update = set() + certs_regenerated = 0 + + with transaction.atomic(): + qs = DeviceCertificate.objects.select_for_update().filter( + config__device=device, + auto_cert=True, + cert__revoked=False, + template__type="cert", + ) + if expected_cert_ids: + valid_cert_ids = [cert_id for _dc_id, cert_id in expected_cert_ids] + qs = qs.filter(cert_id__in=valid_cert_ids) + active_device_certs = qs.select_related("cert", "config", "template") + if not active_device_certs.exists(): + return + expected_map = dict(expected_cert_ids) if expected_cert_ids else {} + for dc in active_device_certs: + expected_cert_id = expected_map.get(dc.id) + if expected_cert_id is not None and dc.cert_id != expected_cert_id: + continue + old_cert = dc.cert + old_cert.revoke() + new_cert = dc._build_cert( + name=device.name, common_name=dc._get_common_name() + ) + new_cert.full_clean() + new_cert.save() + dc.cert = new_cert + dc.save() + configs_to_update.add(dc.config) + certs_regenerated += 1 + for config in configs_to_update: + config.refresh_from_db() + config.update_status_if_checksum_changed() + if certs_regenerated > 0: + try: + message = _( + "Hardware drift detected on device {device_name}. " + "Successfully regenerated {certs_regenerated} " + "bound X.509 certificate(s)." + ).format(device_name=str(device.name), certs_regenerated=certs_regenerated) + notify.send( + sender=device, + target=device, + action_object=device, + type="generic_message", + verb=_("experienced hardware drift"), + message=message, + level="info", + ) + except (ImportError, Exception) as e: + logger.warning( + f"Could not push regeneration notification for {device.name}: {e}" + ) diff --git a/openwisp_controller/config/tests/test_admin.py b/openwisp_controller/config/tests/test_admin.py index ef36d7dae..83a9e2d55 100644 --- a/openwisp_controller/config/tests/test_admin.py +++ b/openwisp_controller/config/tests/test_admin.py @@ -2332,7 +2332,7 @@ def _verify_template_queries(self, config, count): path = reverse(f"admin:{self.app_label}_device_change", args=[config.device.pk]) for i in range(count): self._create_template(name=f"template-{i}") - expected_count = 22 + expected_count = 23 if django.VERSION < (5, 2): # In django version < 5.2, there is an extra SAVEPOINT query # leading to extra RELEASE SAVEPOINT query, thus 2 extra queries diff --git a/openwisp_controller/config/tests/test_api.py b/openwisp_controller/config/tests/test_api.py index c5ac33950..764d18a31 100644 --- a/openwisp_controller/config/tests/test_api.py +++ b/openwisp_controller/config/tests/test_api.py @@ -31,6 +31,7 @@ Config = load_model("config", "Config") DeviceGroup = load_model("config", "DeviceGroup") OrganizationUser = load_model("openwisp_users", "OrganizationUser") +DeviceCertificate = load_model("config", "DeviceCertificate") class ApiTestMixin: @@ -553,7 +554,7 @@ def test_device_download_api(self): d1 = self._create_device() self._create_config(device=d1) path = reverse("config_api:download_device_config", args=[d1.pk]) - with self.assertNumQueries(7): + with self.assertNumQueries(8): r = self.client.get(path) self.assertEqual(r.status_code, 200) @@ -739,7 +740,7 @@ def _assert_template_list_filter(response=None, template=None): self.assertEqual(response.status_code, 200) data = response.data self.assertEqual(data["count"], 1) - self.assertEqual(len(data["results"][0]), 13) + self.assertEqual(len(data["results"][0]), 15) self.assertEqual(data["results"][0]["id"], str(template.pk)) self.assertEqual(data["results"][0]["name"], str(template.name)) self.assertEqual( @@ -1330,6 +1331,214 @@ def test_bearer_authentication(self): ) self.assertEqual(response.status_code, 200) + def test_template_create_cert_type_api(self): + """Create a template of type 'cert' with a CA""" + org = self._get_org() + ca = self._create_ca(organization=org) + path = reverse("config_api:template_list") + data = self._template_data + data.update( + { + "name": "API Cert Template", + "type": "cert", + "ca": ca.pk, + "organization": str(org.pk), + "config": {}, + } + ) + r = self.client.post(path, data, content_type="application/json") + self.assertEqual(r.status_code, 201) + self.assertEqual(r.data["ca"], ca.pk) + + def test_template_create_cert_rejects_without_ca(self): + """Rejects cert template if CA is missing""" + org = self._get_org() + path = reverse("config_api:template_list") + data = self._template_data + data.update( + { + "name": "API Invalid Cert Template", + "type": "cert", + "organization": str(org.pk), + "config": {}, + } + ) + r = self.client.post(path, data, content_type="application/json") + self.assertEqual(r.status_code, 400) + self.assertIn("ca", r.data) + + def test_template_create_cert_blueprint_assignment(self): + """Can assign a blueprint certificate""" + org = self._get_org() + ca = self._create_ca(organization=org) + blueprint = self._create_cert(ca=ca, organization=org) + path = reverse("config_api:template_list") + data = self._template_data + data.update( + { + "name": "API Blueprint Template", + "type": "cert", + "ca": ca.pk, + "blueprint_cert": blueprint.pk, + "organization": str(org.pk), + "config": {}, + } + ) + r = self.client.post(path, data, content_type="application/json") + self.assertEqual(r.status_code, 201) + self.assertEqual(r.data["blueprint_cert"], blueprint.pk) + + def test_template_create_api_blueprint_ca_mismatch(self): + """Blueprint cert must belong to the selected CA""" + org = self._get_org() + ca1 = self._create_ca(name="CA1", common_name="CA1", organization=org) + ca2 = self._create_ca(name="CA2", common_name="CA2", organization=org) + blueprint = self._create_cert(name="BP", ca=ca1, organization=org) + path = reverse("config_api:template_list") + data = self._template_data + data.update( + { + "name": "Mismatch Template", + "type": "cert", + "ca": ca2.pk, + "blueprint_cert": blueprint.pk, + "organization": str(org.pk), + "config": {}, + } + ) + r = self.client.post(path, data, content_type="application/json") + self.assertEqual(r.status_code, 400) + self.assertIn("blueprint_cert", r.data) + self.assertIn( + "match the selected Certificate Authority", str(r.data["blueprint_cert"]) + ) + + def test_template_create_api_blueprint_already_assigned(self): + """Serializer correctly rejects an already assigned blueprint_cert""" + org = self._get_org() + ca = self._create_ca(organization=org) + blueprint = self._create_cert(ca=ca, organization=org) + device = self._create_device(organization=org) + config = self._create_config(device=device) + dummy_template = self._create_template( + type="cert", ca=ca, organization=org, config={} + ) + DeviceCertificate.objects.create( + config=config, template=dummy_template, cert=blueprint + ) + path = reverse("config_api:template_list") + data = self._template_data + data.update( + { + "name": "Assigned BP Template", + "type": "cert", + "ca": str(ca.pk), + "blueprint_cert": str(blueprint.pk), + "organization": str(org.pk), + "config": {}, + } + ) + r = self.client.post(path, data, content_type="application/json") + self.assertEqual(r.status_code, 400) + self.assertIn("blueprint_cert", r.data) + self.assertIn( + "already assigned to a device configuration profile.", + str(r.data["blueprint_cert"]), + ) + + def test_template_update_api_active_change_blocked(self): + """Cannot mutate cert-specific fields on active templates""" + org = self._get_org() + ca1 = self._create_ca(name="CA1", common_name="CA1", organization=org) + ca2 = self._create_ca(name="CA2", common_name="CA2", organization=org) + blueprint = self._create_cert( + name="BP", common_name="BP_CN", ca=ca1, organization=org + ) + template = self._create_template( + name="Active Template", type="cert", ca=ca1, organization=org, config={} + ) + device = self._create_device(organization=org) + config = self._create_config(device=device) + config.templates.add(template) + path = reverse("config_api:template_detail", args=[template.pk]) + data = {"ca": ca2.pk} + r = self.client.patch(path, data, content_type="application/json") + self.assertEqual(r.status_code, 400) + self.assertIn("ca", r.data) + self.assertIn("already assigned to active devices", str(r.data["ca"])) + r = self.client.patch( + path, {"blueprint_cert": blueprint.pk}, content_type="application/json" + ) + self.assertEqual(r.status_code, 400) + self.assertIn("blueprint_cert", r.data) + self.assertIn( + "already assigned to active devices", str(r.data["blueprint_cert"]) + ) + r = self.client.patch( + path, {"type": "generic"}, content_type="application/json" + ) + self.assertEqual(r.status_code, 400) + self.assertIn("type", r.data) + self.assertIn("already assigned to active devices", str(r.data["type"])) + + def test_template_create_api_org_scoping(self): + """Rejects CA or Blueprint from a different organization""" + org1 = self._get_org() + org2 = self._create_org(name="Org2", slug="org2") + ca_org2 = self._create_ca(name="CA2", common_name="CA2", organization=org2) + path = reverse("config_api:template_list") + data = self._template_data + data.update( + { + "name": "Org Scope Template", + "type": "cert", + "ca": ca_org2.pk, + "organization": str(org1.pk), + "config": {}, + } + ) + r = self.client.post(path, data, content_type="application/json") + self.assertEqual(r.status_code, 400) + self.assertIn("organization", r.data) + self.assertIn("related CA match", str(r.data["organization"])) + + def test_device_api_cert_template_lifecycle(self): + """Assigning/removing a cert template triggers DeviceCertificate lifecycle""" + org = self._get_org() + ca = self._create_ca(organization=org) + cert_template = self._create_template( + name="API Lifecycle Cert", type="cert", ca=ca, organization=org, config={} + ) + device = self._create_device(organization=org) + config = self._create_config(device=device) + path = reverse("config_api:device_detail", args=[device.pk]) + data = self._device_data + data.update( + { + "name": device.name, + "organization": str(org.pk), + "mac_address": device.mac_address, + } + ) + with self.subTest("Assigning template via API creates DeviceCertificate"): + data["config"]["templates"] = [str(cert_template.pk)] + response = self.client.put(path, data, content_type="application/json") + self.assertEqual(response.status_code, 200, response.data) + self.assertEqual(config.templates.count(), 1) + self.assertEqual(config.devicecertificate_set.count(), 1) + generated_cert = config.devicecertificate_set.get().cert + + with self.subTest( + "Removing template via API deletes/revokes DeviceCertificate" + ): + data["config"]["templates"] = [] + response = self.client.put(path, data, content_type="application/json") + self.assertEqual(response.status_code, 200, response.data) + self.assertEqual(config.templates.count(), 0) + self.assertEqual(config.devicecertificate_set.count(), 0) + generated_cert.refresh_from_db() + self.assertTrue(generated_cert.revoked) + class TestConfigApiTransaction( ApiTestMixin, diff --git a/openwisp_controller/config/tests/test_config.py b/openwisp_controller/config/tests/test_config.py index 0d3f6772e..4816132a6 100644 --- a/openwisp_controller/config/tests/test_config.py +++ b/openwisp_controller/config/tests/test_config.py @@ -861,13 +861,13 @@ def test_config_modified_sent(self): def test_check_changes_query(self): config = self._create_config(organization=self._get_org()) with self.subTest("No changes made to the config object"): - with self.assertNumQueries(3): + with self.assertNumQueries(4): config._check_changes() with self.subTest("Changes made to the config object"): config.templates.add(self._create_template()) config.config = {"general": {"description": "test"}} - with self.assertNumQueries(4): + with self.assertNumQueries(5): config._check_changes() def test_config_get_system_context(self): diff --git a/openwisp_controller/config/tests/test_device.py b/openwisp_controller/config/tests/test_device.py index 1e2bd4a3e..f3188f410 100644 --- a/openwisp_controller/config/tests/test_device.py +++ b/openwisp_controller/config/tests/test_device.py @@ -5,6 +5,7 @@ from django.test import TestCase, TransactionTestCase from swapper import load_model +from openwisp_controller.config.tasks import regenerate_device_certificates_task from openwisp_utils.tests import AssertNumQueriesSubTestMixin, catch_signal from .. import settings as app_settings @@ -25,6 +26,9 @@ Config = load_model("config", "Config") Device = load_model("config", "Device") DeviceGroup = load_model("config", "DeviceGroup") +DeviceCertificate = load_model("config", "DeviceCertificate") +Cert = load_model("django_x509", "Cert") +Ca = load_model("django_x509", "Ca") OrganizationConfigSettings = load_model("config", "OrganizationConfigSettings") _original_context = app_settings.CONTEXT.copy() @@ -653,6 +657,262 @@ def test_create_default_config_existing(self): self.assertEqual(device.config.context, {"ssid": "test"}) self.assertEqual(device.config.config, {"general": {}}) + @mock.patch( + "openwisp_controller.config.tasks.regenerate_device_certificates_task.delay" + ) + def test_hardware_drift_signal_triggers_on_name_change(self, mocked_task): + """Proof that changing the hostname fires the celery task.""" + org = self._create_org() + device = self._create_device(organization=org, name="old-router-name") + ca = Ca.objects.create(name="test-ca", organization=org) + template = self._create_template( + organization=org, type="cert", ca=ca, auto_cert=True + ) + config = self._create_config(device=device) + config.templates.add(template) + device.name = "new-router-name" + with self.captureOnCommitCallbacks(execute=True): + device.save() + self.assertEqual(mocked_task.call_count, 1) + + @mock.patch( + "openwisp_controller.config.tasks.regenerate_device_certificates_task.delay" + ) + def test_hardware_drift_signal_triggers_on_mac_change(self, mocked_task): + """Proof that changing the MAC address fires the celery task.""" + org = self._create_org() + device = self._create_device(organization=org, mac_address="00:11:22:33:44:55") + ca = Ca.objects.create(name="test-ca", organization=org) + template = self._create_template( + organization=org, type="cert", ca=ca, auto_cert=True + ) + config = self._create_config(device=device) + config.templates.add(template) + device.mac_address = "AA:BB:CC:DD:EE:FF" + with self.captureOnCommitCallbacks(execute=True): + device.save() + self.assertEqual(mocked_task.call_count, 1) + + @mock.patch( + "openwisp_controller.config.tasks.regenerate_device_certificates_task.delay" + ) + def test_hardware_drift_signal_ignores_unrelated_changes(self, mocked_task): + """Proof that saving a device without changing name/MAC does nothing.""" + org = self._create_org() + device = self._create_device(organization=org) + ca = Ca.objects.create(name="test-ca", organization=org) + template = self._create_template( + organization=org, type="cert", ca=ca, auto_cert=True + ) + config = self._create_config(device=device) + config.templates.add(template) + device.key = "new-management-key" + with self.captureOnCommitCallbacks(execute=True): + device.save() + mocked_task.assert_not_called() + + @mock.patch( + "openwisp_controller.config.settings.REGENERATE_CERTS_ON_HARDWARE_CHANGE", False + ) + @mock.patch( + "openwisp_controller.config.tasks.regenerate_device_certificates_task.delay" + ) + def test_hardware_drift_setting_disables_regeneration(self, mocked_task): + """Proof that the user-configurable setting turns the feature off.""" + org = self._create_org() + device = self._create_device(organization=org) + ca = Ca.objects.create(name="test-ca", organization=org) + template = self._create_template( + organization=org, type="cert", ca=ca, auto_cert=True + ) + config = self._create_config(device=device) + config.templates.add(template) + device.name = "another-new-name" + with self.captureOnCommitCallbacks(execute=True): + device.save() + mocked_task.assert_not_called() + + @mock.patch( + "openwisp_controller.config.tasks.regenerate_device_certificates_task.delay" + ) + def test_hardware_drift_partial_save_ignores_dirty_memory(self, mocked_task): + """ + Proof that dirty in-memory fields are + not evaluated if not in update_fields. + """ + org = self._create_org() + device = self._create_device( + organization=org, name="old-name", mac_address="00:11:22:33:44:55" + ) + ca = Ca.objects.create(name="test-ca", organization=org) + template = self._create_template( + organization=org, type="cert", ca=ca, auto_cert=True + ) + config = self._create_config(device=device) + config.templates.add(template) + device.mac_address = "AA:BB:CC:DD:EE:FF" + with self.captureOnCommitCallbacks(execute=True): + device.save(update_fields=["name"]) + mocked_task.assert_not_called() + + @mock.patch( + "openwisp_controller.config.tasks.regenerate_device_certificates_task.delay" + ) + def test_task_regenerates_and_revokes(self, mocked_task): + """Proof that the task physically revokes the old and mints the new.""" + org = self._create_org() + device = self._create_device( + organization=org, name="old-router-name", mac_address="00:11:22:33:44:55" + ) + ca = Ca.objects.create(name="test-ca", organization=org) + template = self._create_template( + organization=org, type="cert", ca=ca, auto_cert=True + ) + config = self._create_config(device=device) + config.templates.add(template) + device_cert = DeviceCertificate.objects.get(config=config, template=template) + old_cert_id = device_cert.cert.id + self.assertFalse(device_cert.cert.revoked) + device.name = "renamed-router" + device.mac_address = "AA:BB:CC:DD:EE:FF" + device.save() + regenerate_device_certificates_task(str(device.id)) + device_cert.refresh_from_db() + old_cert = Cert.objects.get(id=old_cert_id) + self.assertTrue(old_cert.revoked) + self.assertNotEqual(old_cert_id, device_cert.cert.id) + self.assertIn("renamed-router", device_cert.cert.common_name) + self.assertIn("AA:BB:CC:DD:EE:FF", device_cert.cert.common_name) + extensions = device_cert.cert.extensions + mac_ext = next( + (ext for ext in extensions if ext.get("oid") == "1.3.6.1.4.1.65901.1"), None + ) + uuid_ext = next( + (ext for ext in extensions if ext.get("oid") == "1.3.6.1.4.1.65901.2"), None + ) + self.assertIsNotNone(mac_ext, "MAC Address OID 1.3.6.1.4.1.65901.1 is missing") + self.assertIn("AA:BB:CC:DD:EE:FF", mac_ext["value"]) + self.assertIsNotNone(uuid_ext, "Device UUID OID 1.3.6.1.4.1.65901.2 is missing") + self.assertIn(str(device.id), uuid_ext["value"]) + + @mock.patch( + "openwisp_controller.config.tasks.regenerate_device_certificates_task.delay" + ) + def test_regeneration_preserves_blueprint_extensions(self, mocked_task): + """Proof that blueprint extensions carry over alongside custom OIDs.""" + org = self._create_org() + device = self._create_device( + organization=org, name="old-router-name", mac_address="00:11:22:33:44:55" + ) + ca = Ca.objects.create(name="test-ca", organization=org) + blueprint_cert = Cert( + name="enterprise-blueprint", + ca=ca, + organization=org, + common_name="blueprint", + extensions=[{"name": "nsCertType", "value": "server", "critical": False}], + ) + blueprint_cert.full_clean() + blueprint_cert.save() + template = self._create_template( + organization=org, + type="cert", + ca=ca, + auto_cert=True, + blueprint_cert=blueprint_cert, + ) + config = self._create_config(device=device) + config.templates.add(template) + device_cert = DeviceCertificate.objects.get(config=config, template=template) + device.mac_address = "AA:BB:CC:DD:EE:FF" + device.save() + regenerate_device_certificates_task(str(device.id)) + device_cert.refresh_from_db() + extensions = device_cert.cert.extensions + ns_ext = next( + (ext for ext in extensions if ext.get("name") == "nsCertType"), None + ) + self.assertIsNotNone( + ns_ext, "Blueprint extension was lost during regeneration!" + ) + self.assertEqual(ns_ext["value"], "server") + self.assertFalse(ns_ext.get("critical", False)) + mac_ext = next( + (ext for ext in extensions if ext.get("oid") == "1.3.6.1.4.1.65901.1"), None + ) + self.assertIsNotNone(mac_ext, "MAC Address OID is missing!") + self.assertIn("AA:BB:CC:DD:EE:FF", mac_ext["value"]) + + @mock.patch("openwisp_controller.config.tasks.notify.send") + def test_regeneration_triggers_toast_notification(self, mock_notify): + """Proof that a successful regeneration sends a generic_message toast.""" + org = self._create_org() + device = self._create_device( + organization=org, name="old-router-name", mac_address="00:11:22:33:44:55" + ) + ca = Ca.objects.create(name="test-ca", organization=org) + template = self._create_template( + organization=org, type="cert", ca=ca, auto_cert=True + ) + config = self._create_config(device=device) + config.templates.add(template) + device.name = "renamed-router" + device.save() + regenerate_device_certificates_task(str(device.id)) + mock_notify.assert_called_once() + _, kwargs = mock_notify.call_args + self.assertEqual(kwargs.get("type"), "generic_message") + self.assertEqual(kwargs.get("verb"), "experienced hardware drift") + self.assertIn("renamed-router", kwargs.get("message")) + + @mock.patch( + "openwisp_controller.config.tasks.regenerate_device_certificates_task.delay" + ) + def test_regeneration_idempotent_when_duplicate_tasks(self, mocked_task): + """Proof that a duplicate task with stale cert IDs does not rotate again.""" + org = self._create_org() + device = self._create_device( + organization=org, name="old-router-name", mac_address="00:11:22:33:44:55" + ) + ca = Ca.objects.create(name="test-ca", organization=org) + template = self._create_template( + organization=org, type="cert", ca=ca, auto_cert=True + ) + config = self._create_config(device=device) + config.templates.add(template) + device_cert = DeviceCertificate.objects.get(config=config, template=template) + original_cert_id = device_cert.cert.id + self.assertFalse(device_cert.cert.revoked) + expected_cert_ids = list( + DeviceCertificate.objects.filter( + config__device=device, + auto_cert=True, + cert__revoked=False, + template__type="cert", + ).values_list("id", "cert_id") + ) + device.name = "renamed-router" + device.save() + regenerate_device_certificates_task(str(device.id), expected_cert_ids) + device_cert.refresh_from_db() + first_new_cert_id = device_cert.cert.id + self.assertNotEqual(original_cert_id, first_new_cert_id) + self.assertFalse(device_cert.cert.revoked) + old_cert = Cert.objects.get(id=original_cert_id) + self.assertTrue(old_cert.revoked) + # Run the same task again with the same stale expected_cert_ids + # to simulate a duplicate task that was queued before the first + # task completed. The second task should skip because the cert + # IDs no longer match. + regenerate_device_certificates_task(str(device.id), expected_cert_ids) + device_cert.refresh_from_db() + self.assertEqual( + first_new_cert_id, + device_cert.cert.id, + "Duplicate task rotated the cert again despite stale expected_cert_ids", + ) + self.assertFalse(device_cert.cert.revoked) + class TestTransactionDevice( CreateConfigTemplateMixin, diff --git a/openwisp_controller/config/tests/test_selenium.py b/openwisp_controller/config/tests/test_selenium.py index eb09ff280..56028eeb6 100644 --- a/openwisp_controller/config/tests/test_selenium.py +++ b/openwisp_controller/config/tests/test_selenium.py @@ -22,6 +22,8 @@ Device = load_model("config", "Device") DeviceGroup = load_model("config", "DeviceGroup") Cert = load_model("django_x509", "Cert") +DeviceCertificate = load_model("config", "DeviceCertificate") +Notification = load_model("openwisp_notifications", "Notification") class SeleniumTestMixin(BaseSeleniumTestMixin): @@ -486,6 +488,78 @@ def test_relevant_templates_duplicates(self): self.find_element(by=By.NAME, value="_save").click() self.wait_for_presence(By.CSS_SELECTOR, ".messagelist .success", timeout=5) + def test_e2e_certificate_provisioning(self): + """ + End-to-end flow: create CA, create certificate + template, assign to device, verify certificate generation. + """ + org = self._get_org() + ca = self._create_ca( + name="test-ca", + common_name="test-ca", + organization=org, + ) + template = self._create_template( + organization=org, type="cert", ca=ca, auto_cert=True + ) + device = self._create_device(organization=org, name="e2e-router") + self._create_config(device=device) + + self.login() + self.open( + reverse(f"admin:{self.config_app_label}_device_change", args=[device.id]) + + "#config-group" + ) + self.hide_loading_overlay() + self.find_element(by=By.XPATH, value=f'//*[@value="{template.id}"]').click() + self.web_driver.execute_script( + 'document.querySelector("#ow-user-tools").style.display="none"' + ) + self.find_element(by=By.NAME, value="_continue").click() + self.wait_for_presence(By.CSS_SELECTOR, ".messagelist .success", timeout=5) + device.config.refresh_from_db() + device_cert = DeviceCertificate.objects.get( + config=device.config, template=template + ) + self.assertIsNotNone( + device_cert.cert, "Certificate was not generated after UI assignment!" + ) + self.assertFalse(device_cert.cert.revoked) + self.assertEqual(device_cert.cert.name, "e2e-router") + + def test_hardware_drift_notification(self): + org = self._get_org() + ca = self._create_ca( + name="hardware-drift-ca", + common_name="hardware-drift-ca", + organization=org, + ) + template = self._create_template( + organization=org, type="cert", ca=ca, auto_cert=True + ) + device = self._create_device(organization=org, name="old-router-name") + config = self._create_config(device=device) + config.templates.add(template) + self.login() + self.open( + reverse(f"admin:{self.config_app_label}_device_change", args=[device.id]) + ) + self.hide_loading_overlay() + name_input = self.find_element(by=By.NAME, value="name", wait_for="presence") + name_input.clear() + name_input.send_keys("renamed-router") + # Hide user tools because it covers the save button + self.web_driver.execute_script( + 'document.querySelector("#ow-user-tools").style.display="none"' + ) + self.find_element(by=By.NAME, value="_save").click() + self.wait_for_presence(By.CSS_SELECTOR, ".messagelist .success", timeout=5) + self.find_element(by=By.ID, value="openwisp_notifications").click() + notification = self.wait_for_visibility( + By.CLASS_NAME, "ow-notification-elem", timeout=10 + ) + self.assertIn("Hardware drift detected", notification.text) + @tag("selenium_tests") class TestDeviceGroupAdmin( @@ -767,3 +841,38 @@ def test_vpn_edit(self): backend.select_by_visible_text("OpenVPN") self.wait_for_invisibility(by=By.CLASS_NAME, value="field-webhook_endpoint") self.wait_for_invisibility(by=By.CLASS_NAME, value="field-auth_token") + + +@tag("selenium_tests") +class TestTemplateAdmin( + SeleniumTestMixin, + CreateConfigTemplateMixin, + TestVpnX509Mixin, + StaticLiveServerTestCase, +): + def test_certificate_fields_visibility(self): + """ + Ensure CA and Blueprint fields are only visible when type is 'cert'. + """ + self.login() + self.open(reverse(f"admin:{self.config_app_label}_template_add")) + + # Wait for the Type dropdown to load + self.wait_for_presence(By.ID, "id_type") + + with self.subTest("CA and Cert fields should be hidden by default (Generic)"): + self.wait_for_invisibility(By.CLASS_NAME, "field-ca") + self.wait_for_invisibility(By.CLASS_NAME, "field-blueprint_cert") + + with self.subTest("Changing type to 'Certificate' should show fields"): + type_select = Select(self.find_element(by=By.ID, value="id_type")) + type_select.select_by_value("cert") + + self.wait_for_visibility(By.CLASS_NAME, "field-ca") + self.wait_for_visibility(By.CLASS_NAME, "field-blueprint_cert") + + with self.subTest("Changing type back to 'Generic' should hide fields"): + type_select.select_by_value("generic") + + self.wait_for_invisibility(By.CLASS_NAME, "field-ca") + self.wait_for_invisibility(By.CLASS_NAME, "field-blueprint_cert") diff --git a/openwisp_controller/config/tests/test_template.py b/openwisp_controller/config/tests/test_template.py index a8118a71c..335625b8e 100644 --- a/openwisp_controller/config/tests/test_template.py +++ b/openwisp_controller/config/tests/test_template.py @@ -4,7 +4,7 @@ from celery.exceptions import SoftTimeLimitExceeded from django.contrib.auth import get_user_model from django.core.exceptions import PermissionDenied, ValidationError -from django.db import transaction +from django.db import IntegrityError, transaction from django.test import TestCase, TransactionTestCase from netjsonconfig import OpenWrt from netjsonconfig.exceptions import ValidationError as NetjsonconfigValidationError @@ -13,6 +13,7 @@ from openwisp_utils.tests import catch_signal from .. import settings as app_settings +from ..base.template import get_unassigned_certs from ..signals import config_modified, config_status_changed from ..tasks import auto_add_template_to_existing_configs from ..tasks import logger as task_logger @@ -26,6 +27,7 @@ Ca = load_model("django_x509", "Ca") Cert = load_model("django_x509", "Cert") User = get_user_model() +DeviceCertificate = load_model("config", "DeviceCertificate") _original_context = app_settings.CONTEXT.copy() @@ -586,7 +588,7 @@ def test_config_status_modified_after_change(self): with catch_signal(config_status_changed) as handler: t.config["interfaces"][0]["name"] = "eth2" t.full_clean() - with self.assertNumQueries(13): + with self.assertNumQueries(15): t.save() c.refresh_from_db() handler.assert_not_called() @@ -967,3 +969,532 @@ def test_auto_add_template_to_existing_configs_not_triggered(self, mocked_task): required_template.full_clean() required_template.save() mocked_task.assert_not_called() + + def test_standalone_cert_renewal_updates_config_status(self): + org = self._get_org() + ca = self._create_ca(organization=org) + template = self._create_template( + name="cert-renewal-test", + type="cert", + ca=ca, + organization=org, + config={}, + ) + device = self._create_device(organization=org) + config = self._create_config(device=device) + config.templates.add(template) + dc = config.devicecertificate_set.first() + self.assertIsNotNone(dc) + config.status = "applied" + config.save(update_fields=["status"]) + config.refresh_from_db() + with mock.patch.object( + Config, "update_status_if_checksum_changed" + ) as mocked_update: + dc.cert.renew() + mocked_update.assert_called_once() + + +class TestTemplateCertificates(CreateConfigTemplateMixin, TestVpnX509Mixin, TestCase): + """ + tests for standalone X.509 certificate Template configurations + """ + + def test_cert_template_requires_ca(self): + """Test that creating a template with type='cert' requires a CA.""" + try: + self._create_template(type="cert", config={}) + except ValidationError as err: + self.assertIn("ca", err.message_dict) + self.assertIn("required", str(err.message_dict["ca"][0])) + else: + self.fail("ValidationError not raised for missing CA") + + def test_blueprint_must_match_ca(self): + """Test that blueprint_cert must match the selected CA.""" + org = self._get_org() + ca_main = self._create_ca( + name="Main CA", common_name="Main CA", organization=org + ) + ca_other = self._create_ca( + name="Other CA", common_name="Other CA", organization=org + ) + blueprint = self._create_cert( + name="Master Blueprint", ca=ca_main, organization=org + ) + try: + self._create_template( + type="cert", + ca=ca_other, + blueprint_cert=blueprint, + organization=org, + config={}, + ) + except ValidationError as err: + self.assertIn("blueprint_cert", err.message_dict) + self.assertIn("match", str(err.message_dict["blueprint_cert"][0])) + else: + self.fail("ValidationError not raised for CA mismatch") + + def test_blueprint_cannot_be_already_assigned(self): + """Test that blueprint_cert cannot be already assigned to a device.""" + org = self._get_org() + ca = self._create_ca(organization=org) + blueprint = self._create_cert(ca=ca, organization=org) + config = self._create_config(organization=org) + template = self._create_template( + name="cert-template", + type="cert", + ca=ca, + organization=org, + config={}, + ) + DeviceCertificate.objects.create( + config=config, + template=template, + cert=blueprint, + auto_cert=True, + ) + try: + self._create_template( + type="cert", + ca=ca, + blueprint_cert=blueprint, + organization=org, + config={}, + ) + except ValidationError as err: + self.assertIn("blueprint_cert", err.message_dict) + error_messages = " ".join(err.message_dict["blueprint_cert"]) + self.assertIn("already assigned", error_messages) + else: + self.fail("ValidationError not raised for assigned blueprint") + + def test_non_cert_clears_fields(self): + """Test that non-cert template types clear ca and blueprint_cert fields.""" + org = self._get_org() + ca = self._create_ca(organization=org) + blueprint = self._create_cert(ca=ca, organization=org) + t = self._create_template( + type="cert", + ca=ca, + blueprint_cert=blueprint, + organization=org, + config={}, + ) + t.type = "generic" + t.backend = "netjsonconfig.OpenWrt" + t.config = {"interfaces": [{"name": "eth0", "type": "ethernet"}]} + t.full_clean() + self.assertIsNone(t.ca) + self.assertIsNone(t.blueprint_cert) + + def test_organization_validation_for_relations(self): + """Test organization validation for ca field.""" + org1 = self._get_org() + org2 = self._create_org(name="Org2", slug="org2") + ca_org2 = self._create_ca(organization=org2) + + try: + self._create_template( + type="cert", + ca=ca_org2, + organization=org1, + config={}, + ) + except ValidationError as err: + self.assertIn("organization", err.message_dict) + self.assertIn("related CA match", str(err.message_dict["organization"][0])) + else: + self.fail("ValidationError not raised for cross-organization CA relation") + + def test_organization_validation_skipped_for_non_cert(self): + """ + Organization validation for 'ca' and 'blueprint_cert' + should not run if the template is not type 'cert'. + """ + org1 = self._get_org() + org2 = self._create_org(name="Org2", slug="org2") + ca_org2 = self._create_ca(organization=org2) + blueprint_org2 = self._create_cert(ca=ca_org2, organization=org2) + t = self._create_template( + name="stale-relations", + type="generic", + backend="netjsonconfig.OpenWrt", + ca=ca_org2, + blueprint_cert=blueprint_org2, + organization=org1, + config={"interfaces": [{"name": "eth0", "type": "ethernet"}]}, + ) + try: + t.full_clean() + except ValidationError as err: + if "organization" in err.message_dict: + self.fail( + "Organization validation ran on stale cert relations " + "for a non-cert template." + ) + raise err + self.assertIsNone(t.ca) + self.assertIsNone(t.blueprint_cert) + + def test_active_mutation_blocked(self): + """ + Test that cert-specific fields cannot be changed + if assigned to active devices. + """ + org = self._get_org() + ca1 = self._create_ca(name="CA1", common_name="CA1", organization=org) + ca2 = self._create_ca(name="CA2", common_name="CA2", organization=org) + blueprint1 = self._create_cert( + name="BP1", common_name="BP1_CN", ca=ca1, organization=org + ) + blueprint2 = self._create_cert( + name="BP2", common_name="BP2_CN", ca=ca1, organization=org + ) + template = self._create_template( + name="Active Cert Template", + type="cert", + ca=ca1, + blueprint_cert=blueprint1, + organization=org, + config={}, + ) + device = self._create_device(organization=org) + config = self._create_config(device=device) + config.templates.add(template) + with self.subTest("Cannot mutate CA on active template"): + template.ca = ca2 + template.blueprint_cert = None + try: + template.full_clean() + except ValidationError as err: + self.assertIn("ca", err.message_dict) + self.assertIn( + "already assigned to active devices", str(err.message_dict["ca"][0]) + ) + else: + self.fail("ValidationError not raised for active mutation of CA") + + with self.subTest("Cannot mutate blueprint_cert on active template"): + template.refresh_from_db() + template.blueprint_cert = blueprint2 + try: + template.full_clean() + except ValidationError as err: + self.assertIn("blueprint_cert", err.message_dict) + self.assertIn( + "already assigned to active devices", + str(err.message_dict["blueprint_cert"][0]), + ) + else: + self.fail( + "ValidationError not raised for active mutation of blueprint_cert" + ) + + with self.subTest("Cannot change type away from cert on active template"): + template.refresh_from_db() + template.type = "generic" + try: + template.full_clean() + except ValidationError as err: + self.assertIn("type", err.message_dict) + self.assertIn( + "already assigned to active devices", + str(err.message_dict["type"][0]), + ) + else: + self.fail( + "ValidationError not raised for active mutation of template type" + ) + + def test_cert_generation_fallback_to_ca_defaults(self): + """ + Verify synchronous generation falls back to CA defaults + when no blueprint_cert is specified. + """ + org = self._get_org() + ca = self._create_ca( + name="Fallback CA", + organization=org, + country_code="DE", + city="Berlin", + key_length="2048", + digest="sha256", + ) + template = self._create_template( + name="No Blueprint Template", + type="cert", + ca=ca, + blueprint_cert=None, + organization=org, + config={}, + ) + device = self._create_device(organization=org) + config = self._create_config(device=device) + config.templates.add(template) + dev_cert = config.devicecertificate_set.first() + self.assertIsNotNone(dev_cert) + generated_cert = dev_cert.cert + self.assertEqual(generated_cert.country_code, "DE") + self.assertEqual(generated_cert.city, "Berlin") + self.assertEqual(generated_cert.key_length, "2048") + self.assertEqual(generated_cert.digest, "sha256") + + def test_cert_generation_and_revocation_lifecycle(self): + """ + Verify synchronous generation copies blueprint attributes, injects CN/OIDs, + and securely revokes the underlying certificate upon removal. + """ + org = self._get_org() + ca = self._create_ca(organization=org) + blueprint = self._create_cert( + name="Master Blueprint", + common_name="Master Blueprint CN", + ca=ca, + organization=org, + key_length="4096", + digest="sha512", + city="Rome", + country_code="IT", + extensions=[ + {"name": "nsComment", "value": "Test comment", "critical": False} + ], + ) + template = self._create_template( + name="Deep Lifecycle Template", + type="cert", + ca=ca, + blueprint_cert=blueprint, + organization=org, + config={}, + ) + device = self._create_device(organization=org) + config = self._create_config(device=device) + config.templates.add(template) + dev_cert = config.devicecertificate_set.first() + self.assertIsNotNone(dev_cert, "DeviceCertificate was not created") + generated_cert = dev_cert.cert + with self.subTest("Copies blueprint attributes and sets CN"): + self.assertEqual(generated_cert.key_length, "4096") + self.assertEqual(generated_cert.digest, "sha512") + self.assertEqual(generated_cert.city, "Rome") + self.assertEqual(generated_cert.country_code, "IT") + self.assertTrue( + generated_cert.common_name.startswith( + f"{device.mac_address}-{device.name}" + ) + ) + + with self.subTest("Injects custom MAC and UUID OIDs"): + extensions = generated_cert.extensions + mac_oid = "1.3.6.1.4.1.65901.1" + uuid_oid = "1.3.6.1.4.1.65901.2" + mac_ext = next( + ( + ext + for ext in extensions + if ext.get("oid") == mac_oid or ext.get("name") == mac_oid + ), + None, + ) + uuid_ext = next( + ( + ext + for ext in extensions + if ext.get("oid") == uuid_oid or ext.get("name") == uuid_oid + ), + None, + ) + self.assertIsNotNone(mac_ext, "MAC OID extension missing") + self.assertIn(device.mac_address, mac_ext["value"]) + self.assertIsNotNone(uuid_ext, "UUID OID extension missing") + self.assertIn(str(device.id), uuid_ext["value"]) + + with self.subTest("Secure Revocation (post_remove)"): + cert_pk = generated_cert.pk + config.templates.remove(template) + self.assertEqual(config.devicecertificate_set.count(), 0) + revoked_cert = Cert.objects.get(pk=cert_pk) + self.assertTrue( + revoked_cert.revoked, "Underlying certificate was not revoked!" + ) + + def test_device_certificate_autocert_save_is_atomic(self): + """ + Ensure certificate auto-provisioning does not leak Cert rows + when DeviceCertificate save fails. + """ + org = self._get_org() + ca = self._create_ca(organization=org) + template = self._create_template( + name="Atomic Cert Template", + type="cert", + ca=ca, + organization=org, + config={}, + ) + device = self._create_device(organization=org) + config = self._create_config(device=device) + DeviceCertificate.objects.create( + config=config, + template=template, + cert=self._create_cert(ca=ca, organization=org), + auto_cert=False, + ) + cert_count = Cert.objects.count() + + with self.assertRaises(IntegrityError): + DeviceCertificate.objects.create( + config=config, + template=template, + auto_cert=True, + ) + + self.assertEqual(Cert.objects.count(), cert_count) + + def test_cert_template_reorder_does_not_revoke(self): + """ + Verify that reordering templates or performing bulk .set() updates + does not delete and recreate the existing DeviceCertificate. + """ + org = self._get_org() + ca = self._create_ca(organization=org) + cert_template = self._create_template( + name="Cert Template", type="cert", ca=ca, organization=org, config={} + ) + regular_template = self._create_template( + name="Regular Template", + organization=org, + config={"system": {"hostname": "test_router"}}, + ) + device = self._create_device(organization=org) + config = self._create_config(device=device) + config.templates.set([regular_template, cert_template]) + dev_cert = config.devicecertificate_set.first() + self.assertIsNotNone(dev_cert, "DeviceCertificate should be created.") + original_dev_cert_id = dev_cert.pk + original_cert_id = dev_cert.cert.pk + config.templates.set([cert_template, regular_template], clear=True) + dev_certs = config.devicecertificate_set.all() + self.assertEqual(dev_certs.count(), 1) + surviving_dev_cert = dev_certs.first() + self.assertEqual( + surviving_dev_cert.pk, + original_dev_cert_id, + "DeviceCertificate was deleted and recreated during reordering!", + ) + self.assertEqual( + surviving_dev_cert.cert.pk, + original_cert_id, + "Underlying X.509 Cert was replaced during reordering!", + ) + self.assertFalse( + surviving_dev_cert.cert.revoked, + "Certificate was erroneously revoked during reordering!", + ) + + def test_cert_template_partial_replacement_cleans_up(self): + """ + Verify that when a subset of templates is replaced via .set(..., clear=True), + the DeviceCertificates for the removed templates are correctly deleted, + while the kept ones survive. + """ + org = self._get_org() + ca = self._create_ca(organization=org) + + cert_template_1 = self._create_template( + name="Cert Template 1", type="cert", ca=ca, organization=org, config={} + ) + cert_template_2 = self._create_template( + name="Cert Template 2", type="cert", ca=ca, organization=org, config={} + ) + device = self._create_device(organization=org) + config = self._create_config(device=device) + config.templates.set([cert_template_1, cert_template_2]) + self.assertEqual( + config.devicecertificate_set.count(), 2, "Two certs should be created" + ) + kept_cert_id = config.devicecertificate_set.get(template=cert_template_1).pk + removed_dc = config.devicecertificate_set.get(template=cert_template_2) + removed_cert_id = removed_dc.pk + removed_cert_pk = removed_dc.cert_id + removed_cert = Cert.objects.get(pk=removed_cert_pk) + config.templates.set([cert_template_1], clear=True) + self.assertEqual( + config.devicecertificate_set.count(), 1, "Only one cert should remain" + ) + self.assertTrue( + config.devicecertificate_set.filter(pk=kept_cert_id).exists(), + "The kept template's certificate should have survived.", + ) + self.assertFalse( + config.devicecertificate_set.filter(pk=removed_cert_id).exists(), + "The removed template's certificate should have been deleted.", + ) + removed_cert.refresh_from_db() + self.assertTrue( + removed_cert.revoked, + "The removed template's underlying certificate should be revoked.", + ) + + def test_get_unassigned_certs_with_null_device_cert(self): + """ + Test that a DeviceCertificate with cert=None does not poison + the get_unassigned_certs() SQL query due to NULL semantics. + """ + org = self._get_org() + ca = self._create_ca(name="Test-CA", organization=org) + unassigned_cert = self._create_cert( + name="Available-Blueprint", ca=ca, organization=org + ) + device = self._create_device(name="Test-Device", organization=org) + config = self._create_config(device=device) + template = self._create_template( + name="Test-Template", type="cert", ca=ca, organization=org, config={} + ) + device_cert = DeviceCertificate.objects.create( + config=config, template=template, cert=None, auto_cert=False + ) + self.assertIsNone(device_cert.cert_id) + choices = get_unassigned_certs() + queryset = choices.get("pk__in") + self.assertIsNotNone(queryset) + self.assertIn(unassigned_cert, queryset) + + def test_certificate_template_context_injection(self): + """ + Verify that Certificate Templates automatically inject their + file paths, UUIDs, and PEM payloads into the configuration context. + """ + org = self._create_org() + ca = self._create_ca(organization=org) + template = self._create_template( + name="context-injection-test", + type="cert", + ca=ca, + auto_cert=True, + organization=org, + ) + device = self._create_device(organization=org) + config = self._create_config(device=device) + config.templates.add(template) + context = config.get_context() + prefix = f"cert_{template.pk.hex}" + device_cert = config.devicecertificate_set.first() + self.assertIsNotNone(device_cert, "DeviceCertificate was not created") + cert = device_cert.cert + self.assertIn(f"{prefix}_path", context) + self.assertIn(f"{prefix}_key_path", context) + self.assertTrue( + context[f"{prefix}_path"].endswith(f"cert-{template.pk.hex}.pem") + ) + self.assertTrue( + context[f"{prefix}_key_path"].endswith(f"key-{template.pk.hex}.pem") + ) + self.assertIn(f"{prefix}_uuid", context) + self.assertEqual(context[f"{prefix}_uuid"], str(cert.id)) + self.assertIn(f"{prefix}_pem", context) + self.assertIn(f"{prefix}_key", context) + self.assertEqual(context[f"{prefix}_pem"], cert.certificate) + self.assertEqual(context[f"{prefix}_key"], cert.private_key) diff --git a/openwisp_controller/pki/tests/test_api.py b/openwisp_controller/pki/tests/test_api.py index 192562d46..98812d43b 100644 --- a/openwisp_controller/pki/tests/test_api.py +++ b/openwisp_controller/pki/tests/test_api.py @@ -152,7 +152,7 @@ def test_crl_download_api(self): def test_ca_delete_api(self): ca1 = self._create_ca(name="ca1", organization=self._get_org()) path = reverse("pki_api:ca_detail", args=[ca1.pk]) - with self.assertNumQueries(5): + with self.assertNumQueries(6): r = self.client.delete(path) self.assertEqual(r.status_code, 204) self.assertEqual(Ca.objects.count(), 0) @@ -253,7 +253,7 @@ def test_cert_put_api(self): "organization": org2.pk, "notes": "new-notes", } - with self.assertNumQueries(10): + with self.assertNumQueries(11): r = self.client.put(path, data, content_type="application/json") self.assertEqual(r.status_code, 200) self.assertEqual(r.data["name"], "cert1-change") @@ -264,7 +264,7 @@ def test_cert_patch_api(self): cert1 = self._create_cert(name="cert1") path = reverse("pki_api:cert_detail", args=[cert1.pk]) data = {"name": "cert1-change"} - with self.assertNumQueries(8): + with self.assertNumQueries(9): r = self.client.patch(path, data, content_type="application/json") self.assertEqual(r.status_code, 200) self.assertEqual(r.data["name"], "cert1-change") @@ -272,7 +272,7 @@ def test_cert_patch_api(self): def test_cert_delete_api(self): cert1 = self._create_cert(name="cert1") path = reverse("pki_api:cert_detail", args=[cert1.pk]) - with self.assertNumQueries(6): + with self.assertNumQueries(8): r = self.client.delete(path) self.assertEqual(r.status_code, 204) self.assertEqual(Cert.objects.count(), 0) @@ -289,7 +289,7 @@ def test_post_cert_renew_api(self): cert1 = self._create_cert(name="cert1") old_serial_num = cert1.serial_number path = reverse("pki_api:cert_renew", args=[cert1.pk]) - with self.assertNumQueries(5): + with self.assertNumQueries(6): r = self.client.post(path) self.assertEqual(r.status_code, 200) cert1.refresh_from_db() diff --git a/tests/openwisp2/sample_config/migrations/0010_template_blueprint_cert_template_ca_and_more.py b/tests/openwisp2/sample_config/migrations/0010_template_blueprint_cert_template_ca_and_more.py new file mode 100644 index 000000000..359912509 --- /dev/null +++ b/tests/openwisp2/sample_config/migrations/0010_template_blueprint_cert_template_ca_and_more.py @@ -0,0 +1,150 @@ +# Generated by Django 5.2.14 on 2026-05-30 18:33 + +import uuid + +import django.db.models.deletion +import django.utils.timezone +import model_utils.fields +from django.db import migrations, models + +import openwisp_controller.config.base.template + + +class Migration(migrations.Migration): + + dependencies = [ + ("sample_config", "0009_replace_jsonfield_with_django_builtin"), + ("sample_pki", "0004_alter_ca_extensions_alter_ca_key_length_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="template", + name="blueprint_cert", + field=models.ForeignKey( + blank=True, + help_text="Optional: Select an unassigned certificate " + "to copy extensions and properties from.", + limit_choices_to=openwisp_controller.config.base.template.get_unassigned_certs, # noqa + null=True, + on_delete=django.db.models.deletion.SET_NULL, + to="sample_pki.cert", + verbose_name="Blueprint Certificate", + ), + ), + migrations.AddField( + model_name="template", + name="ca", + field=models.ForeignKey( + blank=True, + help_text="The Certificate Authority that will sign " + "certificates generated by this template.", + null=True, + on_delete=django.db.models.deletion.CASCADE, + to="sample_pki.ca", + verbose_name="Certificate Authority", + ), + ), + migrations.AlterField( + model_name="template", + name="auto_cert", + field=models.BooleanField( + db_index=True, + default=openwisp_controller.config.base.template.default_auto_cert, + help_text="whether tunnel specific configuration " + "(cryptographic keys, ip addresses, etc) should be " + "automatically generated and managed behind the scenes for " + "each configuration using this template, valid only for the " + "VPN and certificate template types", + verbose_name="automatic tunnel provisioning", + ), + ), + migrations.AlterField( + model_name="template", + name="type", + field=models.CharField( + choices=[ + ("generic", "Generic"), + ("vpn", "VPN-client"), + ("cert", "Certificate"), + ], + db_index=True, + default="generic", + help_text="template type, determines which features are available", + max_length=16, + verbose_name="type", + ), + ), + migrations.CreateModel( + name="DeviceCertificate", + fields=[ + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + ), + ), + ( + "created", + model_utils.fields.AutoCreatedField( + default=django.utils.timezone.now, + editable=False, + verbose_name="created", + ), + ), + ( + "modified", + model_utils.fields.AutoLastModifiedField( + default=django.utils.timezone.now, + editable=False, + verbose_name="modified", + ), + ), + ("auto_cert", models.BooleanField(default=False)), + ("details", models.CharField(blank=True, max_length=64, null=True)), + ( + "cert", + models.OneToOneField( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + to="sample_pki.cert", + ), + ), + ( + "config", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to="sample_config.config", + ), + ), + ( + "template", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to="sample_config.template", + ), + ), + ], + options={ + "verbose_name": "Device certificate", + "verbose_name_plural": "Device certificates", + "abstract": False, + "unique_together": {("config", "template")}, + }, + ), + migrations.AddField( + model_name="config", + name="device_certificates", + field=models.ManyToManyField( + blank=True, + related_name="config_device_certificates", + through="sample_config.DeviceCertificate", + to="sample_config.template", + verbose_name="device certificates", + ), + ), + ] diff --git a/tests/openwisp2/sample_config/models.py b/tests/openwisp2/sample_config/models.py index e0e6448a5..50d401bbc 100644 --- a/tests/openwisp2/sample_config/models.py +++ b/tests/openwisp2/sample_config/models.py @@ -2,6 +2,7 @@ from openwisp_controller.config.base.config import AbstractConfig from openwisp_controller.config.base.device import AbstractDevice +from openwisp_controller.config.base.device_certificate import AbstractDeviceCertificate from openwisp_controller.config.base.device_group import AbstractDeviceGroup from openwisp_controller.config.base.multitenancy import ( AbstractOrganizationConfigSettings, @@ -95,6 +96,15 @@ class Meta(AbstractVpnClient.Meta): abstract = False +class DeviceCertificate(DetailsModel, AbstractDeviceCertificate): + """ + m2m through model + """ + + class Meta(AbstractDeviceCertificate.Meta): + abstract = False + + class OrganizationConfigSettings(DetailsModel, AbstractOrganizationConfigSettings): """ Configuration management settings diff --git a/tests/openwisp2/settings.py b/tests/openwisp2/settings.py index c45eb5537..810d0ba21 100644 --- a/tests/openwisp2/settings.py +++ b/tests/openwisp2/settings.py @@ -281,6 +281,7 @@ CONFIG_TEMPLATE_MODEL = "sample_config.Template" CONFIG_VPN_MODEL = "sample_config.Vpn" CONFIG_VPNCLIENT_MODEL = "sample_config.VpnClient" + CONFIG_DEVICECERTIFICATE_MODEL = "sample_config.DeviceCertificate" CONFIG_ORGANIZATIONCONFIGSETTINGS_MODEL = "sample_config.OrganizationConfigSettings" CONFIG_ORGANIZATIONLIMITS_MODEL = "sample_config.OrganizationLimits" CONFIG_WHOISINFO_MODEL = "sample_config.WHOISInfo"