Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from neutron.db.models.l3 import FloatingIP
from neutron.db.qos import models as qos_models
from neutron_lib import exceptions as n_exc
from neutron_lib.db import api as db_api

from oslo_config import cfg

Expand Down Expand Up @@ -563,56 +564,6 @@ def delete_live_migration(self, name, mod_revision=None):
def add_port_interface_name(self, port, port_extra):
port_extra.interface_name = "tap" + port["id"][:11]

def get_security_groups_for_port(self, context, port):
"""Checks which security groups apply for a given port.

Frustratingly, the port dict provided to us when we call get_port may
actually be out of date, and I don't know why. This change ensures that
we get the most recent information.
"""
filters = {"port_id": [port["id"]]}
bindings = self.db._get_port_security_group_bindings(context, filters=filters)
return [binding["security_group_id"] for binding in bindings]

def get_fixed_ips_for_port(self, context, port):
"""Obtains a complete list of fixed IPs for a port.

Much like with security groups, for some insane reason we're given an
out of date port dictionary when we call get_port. This forces an
explicit query of the IPAllocation table to get the right data out of
Neutron.
"""
return [
{"subnet_id": ip["subnet_id"], "ip_address": ip["ip_address"]}
for ip in context.session.query(models_v2.IPAllocation).filter_by(
port_id=port["id"]
)
]

def get_floating_ips_for_port(self, context, port):
"""Obtains a list of floating IPs for a port."""
return [
{"int_ip": ip["fixed_ip_address"], "ext_ip": ip["floating_ip_address"]}
for ip in context.session.query(FloatingIP).filter_by(
fixed_port_id=port["id"]
)
]

def get_network_properties_for_port(self, context, port, port_extra):
network = (
context.session.query(models_v2.Network)
.filter_by(id=port["network_id"])
.first()
)

try:
port_extra.network_name = datamodel_v3.sanitize_label_name_value(
network["name"],
NETWORK_NAME_MAX_LENGTH,
)
except Exception:
LOG.warning(f"Failed to find network name for port {port['id']}")

def get_extra_port_information(self, context, port):
"""get_extra_port_information

Expand All @@ -623,16 +574,86 @@ def get_extra_port_information(self, context, port):
if self._bulk is not None:
return self._get_extra_port_information_from_bulk(context, port)
port_extra = PortExtra()
port_extra.fixed_ips = self.get_fixed_ips_for_port(context, port)
port_extra.floating_ips = self.get_floating_ips_for_port(context, port)
port_extra.security_groups = self.get_security_groups_for_port(context, port)
self.get_network_properties_for_port(context, port, port_extra)

# Collect information that uses raw queries into the Neutron DB. These queries
# need ``session.in_transaction()`` to be True, or else SQLAlchemy drops huge
# WARNING tracebacks saying "ORM session: SQL execution without transaction in
# progress". We arrange for that by using the ``CONTEXT_READER`` wrapper.
with db_api.CONTEXT_READER.using(context):
# We may have an out of date or incomplete port dict at this point.
# Explicitly query the IPAllocation table to get latest fixed IP data.
port_extra.fixed_ips = [
{"subnet_id": ip["subnet_id"], "ip_address": ip["ip_address"]}
for ip in context.session.query(models_v2.IPAllocation).filter_by(
port_id=port["id"]
)
]

# Similarly for floating IPs.
port_extra.floating_ips = [
{"int_ip": ip["fixed_ip_address"], "ext_ip": ip["floating_ip_address"]}
for ip in context.session.query(FloatingIP).filter_by(
fixed_port_id=port["id"]
)
]

# And security groups.
port_extra.security_groups = [
binding["security_group_id"]
for binding in self.db._get_port_security_group_bindings(
context, filters={"port_id": [port["id"]]}
)
]

# Read the Network so we can get its name.
network = (
context.session.query(models_v2.Network)
.filter_by(id=port["network_id"])
.first()
)
try:
port_extra.network_name = datamodel_v3.sanitize_label_name_value(
network["name"],
NETWORK_NAME_MAX_LENGTH,
)
except Exception:
LOG.warning(f"Failed to find network name for port {port['id']}")
Comment thread
nelljerram marked this conversation as resolved.
Outdated

# Read QoS rules. We build port_extra.qos here, inside the
# reader, so that the per-rule attribute accesses inside
# build_qos_controls happen while the rule ORM objects are
# still attached to the session. Calling build_qos_controls
# after the reader exited would work today (the columns we
# access are simple eager-loaded ones, and oslo.db's reader
# mode typically leaves detached attributes readable), but
# would tie our correctness to oslo.db's expire_on_commit /
# rollback_reader_sessions configuration. build_qos_controls
# is pure compute -- no extra SQL -- so calling it inside
# the reader is cheap and decouples us from those internals.
qos_policy_id = port.get("qos_policy_id") or port.get(
"qos_network_policy_id"
)
LOG.debug("QoS Policy ID = %r", qos_policy_id)
if qos_policy_id:
bw_rules = context.session.query(
qos_models.QosBandwidthLimitRule
).filter_by(qos_policy_id=qos_policy_id)
pr_rules = context.session.query(
qos_models.QosPacketRateLimitRule
).filter_by(qos_policy_id=qos_policy_id)
Comment thread
nelljerram marked this conversation as resolved.
else:
bw_rules = []
pr_rules = []
port_extra.qos = self.build_qos_controls(bw_rules, pr_rules)

# Now processing that either MUST be outside of any transaction - because it
# will call @retry_if_session_inactive-decorated calls that only work correctly
# when not in an outer transaction - or that doesn't involve the DB at all and
# so doesn't care about transaction state.
self.add_port_gateways(context, port_extra)
Comment thread
nelljerram marked this conversation as resolved.
self.add_port_interface_name(port, port_extra)
self.add_port_project_data(port, context, port_extra)
self.add_port_sg_names(context, port_extra)
self.add_port_qos(port, context, port_extra)

return port_extra

Expand Down Expand Up @@ -696,18 +717,29 @@ def _get_extra_port_information_from_bulk(self, context, port):
self.add_port_project_data(port, context, port_extra)

# QoS — use bulk-prefetched rules.
self.add_port_qos(port, context, port_extra)
qos_policy_id = port.get("qos_policy_id") or port.get("qos_network_policy_id")
LOG.debug("QoS Policy ID = %r", qos_policy_id)
if qos_policy_id:
bw_rules = bulk["qos_bw_by_policy"].get(qos_policy_id, [])
pr_rules = bulk["qos_pr_by_policy"].get(qos_policy_id, [])
else:
bw_rules = []
pr_rules = []

port_extra.qos = self.build_qos_controls(bw_rules, pr_rules)

return port_extra

def add_port_gateways(self, context, port_extra):
"""add_port_gateways

Determine the gateway IP addresses for a given port's IP addresses, and
adds them to the port dict.
Determine the gateway IP addresses for a given port's IP addresses, and adds
them to the port dict.

This method assumes it's being called from within a database
transaction and does not take out another one.
The ``self.db.get_subnet`` call is ``@retry_if_session_inactive`` +
``@CONTEXT_READER``-decorated, so this method MUST run without any outer
transaction we own (otherwise the retry decorator would be disabled). Each call
opens and closes its own reader transaction internally.
"""
for ip in port_extra.fixed_ips:
subnet = self.db.get_subnet(context, ip["subnet_id"])
Expand All @@ -718,14 +750,16 @@ def add_port_sg_names(self, context, port_extra):

Determine and store the name of each security group that a port uses.

This method assumes it's being called from within a database
transaction and does not take out another one.
The ``self.db.get_security_groups`` call is ``@retry_if_session_inactive`` +
``@CONTEXT_READER``-decorated, so this method MUST run without any outer
transaction we own. The retry decorator does the recovery for the
``_ensure_default_security_group`` race that ``default_sg=True`` (below) is
meant to side-step.
"""
# Oddly, get_security_groups normally tries to create the default SG
# for the current tenant, and that can hit a
# NeutronDbObjectDuplicateEntry exception - presumably if there's a
# race with multiple servers or threads trying to do this at the same
# time. Adding "default_sg=True" here suppresses that creation
# Oddly, get_security_groups normally tries to create the default SG for the
# current tenant, and that can hit a NeutronDbObjectDuplicateEntry exception -
# presumably if there's a race with multiple servers or threads trying to do
# this at the same time. Adding "default_sg=True" here suppresses that creation
# attempt.
filters = {"id": port_extra.security_groups}
for sg in self.db.get_security_groups(
Expand All @@ -736,14 +770,9 @@ def add_port_sg_names(self, context, port_extra):
)
port_extra.security_group_names[sg["id"]] = sg_name

def add_port_qos(self, port, context, port_extra):
"""add_port_qos

Determine and store QoS parameters for a port.

This method assumes it's being called from within a database
transaction and does not take out another one.
"""
@staticmethod
def build_qos_controls(bw_rules, pr_rules):
"""Build QoSControls dict, from the given rules and config."""
qos = {}

# Minima, maxima and defaults as specified in the WorkloadEndpoint API,
Expand All @@ -767,41 +796,25 @@ def cap(setting, minmax):
setting = max
return setting

qos_policy_id = port.get("qos_policy_id") or port.get("qos_network_policy_id")
LOG.debug("QoS Policy ID = %r", qos_policy_id)
if qos_policy_id:
# Prefer bulk-prefetched rules when running inside resync;
# otherwise do per-port queries (e.g. on postcommit hooks).
if self._bulk is not None:
bw_rules = self._bulk["qos_bw_by_policy"].get(qos_policy_id, [])
pr_rules = self._bulk["qos_pr_by_policy"].get(qos_policy_id, [])
else:
bw_rules = context.session.query(
qos_models.QosBandwidthLimitRule
).filter_by(qos_policy_id=qos_policy_id)
pr_rules = context.session.query(
qos_models.QosPacketRateLimitRule
).filter_by(qos_policy_id=qos_policy_id)

for r in bw_rules:
LOG.debug("BW rule = %r", r)
direction = r.get("direction", "egress")
if r["max_kbps"] != 0:
qos[direction + "Bandwidth"] = cap(
r["max_kbps"] * 1000, MINMAX_BANDWIDTH
)
if r["max_burst_kbps"] != 0:
qos[direction + "Peakrate"] = cap(
r["max_burst_kbps"] * 1000, MINMAX_BW_PEAKRATE
)
for r in bw_rules:
LOG.debug("BW rule = %r", r)
direction = r.get("direction", "egress")
if r["max_kbps"] != 0:
qos[direction + "Bandwidth"] = cap(
r["max_kbps"] * 1000, MINMAX_BANDWIDTH
)
if r["max_burst_kbps"] != 0:
qos[direction + "Peakrate"] = cap(
r["max_burst_kbps"] * 1000, MINMAX_BW_PEAKRATE
)

for r in pr_rules:
LOG.debug("PR rule = %r", r)
direction = r.get("direction", "egress")
if r["max_kpps"] != 0:
qos[direction + "PacketRate"] = cap(
r["max_kpps"] * 1000, MINMAX_PACKET_RATE
)
for r in pr_rules:
LOG.debug("PR rule = %r", r)
direction = r.get("direction", "egress")
if r["max_kpps"] != 0:
qos[direction + "PacketRate"] = cap(
r["max_kpps"] * 1000, MINMAX_PACKET_RATE
)

if cfg.CONF.calico.max_ingress_connections_per_port != 0:
qos["ingressMaxConnections"] = cap(
Expand Down Expand Up @@ -852,7 +865,7 @@ def cap(setting, minmax):
else:
qos["egressPacketBurst"] = calico_config.DEFAULT_PR_BURST

port_extra.qos = qos
return qos

def add_port_project_data(self, port, context, port_extra):
"""add_port_project_data
Expand Down
Loading