Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/common/telemetry/server/datastore/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ func (w metricsWrapper) FetchAttestedNode(ctx context.Context, spiffeID string)
return w.ds.FetchAttestedNode(ctx, spiffeID)
}

func (w metricsWrapper) FetchAttestedNodes(ctx context.Context, spiffeIDs []string) (_ map[string]*common.AttestedNode, err error) {
callCounter := StartFetchNodeCall(w.m)
defer callCounter.Done(&err)
return w.ds.FetchAttestedNodes(ctx, spiffeIDs)
}
Comment on lines +134 to +138

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed this would improve observability. I've left it sharing StartFetchNodeCall for now to match the existing convention — the bulk FetchRegistrationEntries similarly reuses StartFetchRegistrationCall.

To keep telemetry consistent, I'd suggest introducing dedicated batch metrics for both bulk methods (plus the corresponding telemetry_config.md entries) in a separate follow-up rather than diverging here. Let me know if you'd prefer it in this PR.


func (w metricsWrapper) FetchAttestedNodeEvent(ctx context.Context, eventID uint) (_ *datastore.AttestedNodeEvent, err error) {
callCounter := StartFetchAttestedNodeEventCall(w.m)
defer callCounter.Done(&err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/common/telemetry/server/datastore/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func TestWithMetrics(t *testing.T) {
key: "datastore.node.fetch",
methodName: "FetchAttestedNode",
},
{
key: "datastore.node.fetch",
methodName: "FetchAttestedNodes",
},
{
key: "datastore.node_event.fetch",
methodName: "FetchAttestedNodeEvent",
Expand Down Expand Up @@ -424,6 +428,10 @@ func (ds *fakeDataStore) FetchAttestedNode(context.Context, string) (*common.Att
return &common.AttestedNode{}, ds.err
}

func (ds *fakeDataStore) FetchAttestedNodes(context.Context, []string) (map[string]*common.AttestedNode, error) {
return map[string]*common.AttestedNode{}, ds.err
}

func (ds *fakeDataStore) FetchAttestedNodeEvent(context.Context, uint) (*datastore.AttestedNodeEvent, error) {
return &datastore.AttestedNodeEvent{}, ds.err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type DataStore interface {
CreateAttestedNode(context.Context, *common.AttestedNode) (*common.AttestedNode, error)
DeleteAttestedNode(ctx context.Context, spiffeID string) (*common.AttestedNode, error)
FetchAttestedNode(ctx context.Context, spiffeID string) (*common.AttestedNode, error)
FetchAttestedNodes(ctx context.Context, spiffeIDs []string) (map[string]*common.AttestedNode, error)
Comment thread
nweisenauer-sap marked this conversation as resolved.
ListAttestedNodes(context.Context, *ListAttestedNodesRequest) (*ListAttestedNodesResponse, error)
UpdateAttestedNode(context.Context, *common.AttestedNode, *common.AttestedNodeMask) (*common.AttestedNode, error)
PruneAttestedExpiredNodes(ctx context.Context, expiredBefore time.Time, includeNonReattestable bool) error
Expand Down Expand Up @@ -161,6 +162,7 @@ type ListAttestedNodesRequest struct {
ByBanned *bool
ByExpiresBefore time.Time
BySelectorMatch *BySelectors
BySpiffeIDs []string
FetchSelectors bool
Pagination *Pagination
ByCanReattest *bool
Expand Down
40 changes: 40 additions & 0 deletions pkg/server/datastore/sqlstore/sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,30 @@ func (ds *Plugin) FetchAttestedNode(ctx context.Context, spiffeID string) (attes
return attestedNode, nil
}

// FetchAttestedNodes fetches existing attested nodes by SPIFFE IDs, including their selectors
func (ds *Plugin) FetchAttestedNodes(ctx context.Context, spiffeIDs []string) (map[string]*common.AttestedNode, error) {
nodesMap := make(map[string]*common.AttestedNode)
if len(spiffeIDs) == 0 {
return nodesMap, nil
}

var resp *datastore.ListAttestedNodesResponse
if err := ds.withReadTx(ctx, func(tx *gorm.DB) (err error) {
resp, err = listAttestedNodes(ctx, ds.db, ds.log, &datastore.ListAttestedNodesRequest{
BySpiffeIDs: spiffeIDs,
FetchSelectors: true,
})
return err
}); err != nil {
Comment thread
nweisenauer-sap marked this conversation as resolved.
Outdated
return nil, err
}

for _, node := range resp.Nodes {
nodesMap[node.SpiffeId] = node
}
return nodesMap, nil
}

// CountAttestedNodes counts all attested nodes
func (ds *Plugin) CountAttestedNodes(ctx context.Context, req *datastore.CountAttestedNodesRequest) (count int32, err error) {
if countAttestedNodesHasFilters(req) {
Expand Down Expand Up @@ -2048,6 +2072,14 @@ func buildListAttestedNodesQueryCTE(req *datastore.ListAttestedNodesRequest, dbT
}
}

// Filter by a set of SPIFFE IDs
if len(req.BySpiffeIDs) > 0 {
builder.WriteString("\t\tAND spiffe_id IN (")
builder.WriteString(buildQuestions(req.BySpiffeIDs))
builder.WriteString(")\n")
args = append(args, buildArgs(req.BySpiffeIDs)...)
}

builder.WriteString(")")
// Fetch all selectors from filtered entries
if fetchSelectors {
Expand Down Expand Up @@ -2281,6 +2313,14 @@ FROM attested_node_entries N
builder.WriteString("\t\tAND can_reattest = false\n")
}
}

// Filter by a set of SPIFFE IDs
if len(req.BySpiffeIDs) > 0 {
builder.WriteString(" AND N.spiffe_id IN (")
builder.WriteString(buildQuestions(req.BySpiffeIDs))
builder.WriteString(")")
args = append(args, buildArgs(req.BySpiffeIDs)...)
}
return nil
}

Expand Down
74 changes: 74 additions & 0 deletions pkg/server/datastore/sqlstore/sqlstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,80 @@ func (s *PluginSuite) TestFetchAttestedNodeMissing() {
s.Require().Nil(attestedNode)
}

func (s *PluginSuite) TestFetchAttestedNodes() {
createNode := func(spiffeID string, selectors []*common.Selector) *common.AttestedNode {
node, err := s.ds.CreateAttestedNode(ctx, &common.AttestedNode{
SpiffeId: spiffeID,
AttestationDataType: "aws-tag",
CertSerialNumber: "badcafe",
CertNotAfter: time.Now().Add(time.Hour).Unix(),
})
s.Require().NoError(err)
s.setNodeSelectors(spiffeID, selectors)
node.Selectors = selectors
return node
}

node1 := createNode("spiffe://example.org/node1", []*common.Selector{{Type: "a", Value: "1"}})
node2 := createNode("spiffe://example.org/node2", []*common.Selector{{Type: "b", Value: "2"}})
node3 := createNode("spiffe://example.org/node3", []*common.Selector{{Type: "c", Value: "3"}})

// Create a node and then delete it so we can test it doesn't get returned with the fetch
node4 := createNode("spiffe://example.org/node4", []*common.Selector{{Type: "d", Value: "4"}})
deletedNode, err := s.ds.DeleteAttestedNode(ctx, node4.SpiffeId)
s.Require().NoError(err)
s.Require().NotNil(deletedNode)

for _, tt := range []struct {
name string
nodes []*common.AttestedNode
deletedSpiffeID string
}{
{
name: "No nodes",
},
{
name: "Nodes 1 and 2",
nodes: []*common.AttestedNode{node1, node2},
},
{
name: "Nodes 1, 2, and 3",
nodes: []*common.AttestedNode{node1, node2, node3},
},
{
name: "Deleted node",
nodes: []*common.AttestedNode{node2, node3},
deletedSpiffeID: deletedNode.SpiffeId,
},
} {
s.T().Run(tt.name, func(t *testing.T) {
spiffeIDs := make([]string, 0, len(tt.nodes))
for _, node := range tt.nodes {
spiffeIDs = append(spiffeIDs, node.SpiffeId)
}
fetchedNodes, err := s.ds.FetchAttestedNodes(ctx, append(spiffeIDs, tt.deletedSpiffeID))
s.Require().NoError(err)

// Make sure all nodes we want to fetch are present, including selectors.
s.Require().Equal(len(tt.nodes), len(fetchedNodes))
for _, node := range tt.nodes {
fetchedNode, ok := fetchedNodes[node.SpiffeId]
s.Require().True(ok)
s.RequireProtoEqual(node, fetchedNode)
}

// Make sure any deleted nodes are not present.
_, ok := fetchedNodes[tt.deletedSpiffeID]
s.Require().False(ok)
})
}

// An empty request returns an empty map.
fetchedNodes, err := s.ds.FetchAttestedNodes(ctx, nil)
s.Require().NoError(err)
s.Require().Empty(fetchedNodes)
}

func (s *PluginSuite) TestListAttestedNodes() {
// Connection is never used, each test creates a connection to a different database
s.ds.Close()
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/endpoints/authorized_entryfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (a *AuthorizedEntryFetcherEvents) buildCache(ctx context.Context) error {
return err
}

attestedNodes, err := buildAttestedNodesCache(ctx, a.c.log, a.c.metrics, a.c.ds, a.c.clk, cache, a.c.nodeCache, a.c.cacheReloadInterval, a.c.eventTimeout)
attestedNodes, err := buildAttestedNodesCache(ctx, a.c.log, a.c.metrics, a.c.ds, a.c.clk, cache, a.c.nodeCache, pageSize, a.c.cacheReloadInterval, a.c.eventTimeout)
if err != nil {
return err
}
Expand Down
53 changes: 33 additions & 20 deletions pkg/server/endpoints/authorized_entryfetcher_attested_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"maps"
"slices"
"time"

"github.com/andres-erbsen/clock"
Expand Down Expand Up @@ -35,6 +37,7 @@ type attestedNodes struct {

eventTracker *eventTracker
eventTimeout time.Duration
pageSize int32

fetchNodes map[string]struct{}

Expand Down Expand Up @@ -158,7 +161,11 @@ func (a *attestedNodes) loadCache(ctx context.Context) error {

// buildAttestedNodesCache fetches all attested nodes and adds the unexpired ones to the cache.
// It runs once at startup.
func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, nodeCache *nodecache.Cache, cacheReloadInterval, eventTimeout time.Duration) (*attestedNodes, error) {
func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, nodeCache *nodecache.Cache, pageSize int32, cacheReloadInterval, eventTimeout time.Duration) (*attestedNodes, error) {
if pageSize <= 0 {
return nil, fmt.Errorf("page size must be positive, got %d", pageSize)
}

pollPeriods := PollPeriods(cacheReloadInterval, eventTimeout)

attestedNodes := &attestedNodes{
Expand All @@ -169,6 +176,7 @@ func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, metric
log: log,
metrics: metrics,
eventTimeout: eventTimeout,
pageSize: pageSize,

eventsBeforeFirst: make(map[uint]struct{}),
fetchNodes: make(map[string]struct{}),
Expand Down Expand Up @@ -211,34 +219,39 @@ func (a *attestedNodes) updateCache(ctx context.Context) error {
}

func (a *attestedNodes) updateCachedNodes(ctx context.Context) error {
for spiffeId := range a.fetchNodes {
node, err := a.ds.FetchAttestedNode(ctx, spiffeId)
spiffeIds := slices.Collect(maps.Keys(a.fetchNodes))
for pageStart := 0; pageStart < len(spiffeIds); pageStart += int(a.pageSize) {
fetchNodes := a.fetchNodesPage(spiffeIds, pageStart)
nodes, err := a.ds.FetchAttestedNodes(ctx, fetchNodes)
if err != nil {
continue
return err
}
Comment on lines +222 to 228

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional, to stay consistent with updateCachedEntries (the registration-entry equivalent from #5970), which also returns on a failed page fetch.

There's no data loss or extended-stale risk: IDs are only removed from fetchNodes on success or confirmed deletion, so a failed page (and any later pages) stays queued and is retried on the next reload tick. It also surfaces the error in logs/telemetry, whereas the old per-node continue silently swallowed persistent failures — relevant since this PR targets the skipped-event spikes in #6876.

I'd prefer to keep the two cache paths symmetric, but I'm happy to switch to per-page skip if you'd rather change both here and in the entries path.


// Node was deleted
if node == nil {
a.nodeCache.RemoveAttestedNode(spiffeId)
a.cache.RemoveAgent(spiffeId)
delete(a.fetchNodes, spiffeId)
continue
}
for _, spiffeId := range fetchNodes {
node, ok := nodes[spiffeId]
// Node was deleted
if !ok {
a.nodeCache.RemoveAttestedNode(spiffeId)
a.cache.RemoveAgent(spiffeId)
delete(a.fetchNodes, spiffeId)
continue
}

selectors, err := a.ds.GetNodeSelectors(ctx, spiffeId, datastore.RequireCurrent)
if err != nil {
continue
agentExpiresAt := time.Unix(node.CertNotAfter, 0)
a.cache.UpdateAgent(node.SpiffeId, agentExpiresAt, api.ProtoFromSelectors(node.Selectors))
a.nodeCache.UpdateAttestedNode(node)
Comment thread
nweisenauer-sap marked this conversation as resolved.
delete(a.fetchNodes, spiffeId)
}
node.Selectors = selectors

agentExpiresAt := time.Unix(node.CertNotAfter, 0)
a.cache.UpdateAgent(node.SpiffeId, agentExpiresAt, api.ProtoFromSelectors(node.Selectors))
a.nodeCache.UpdateAttestedNode(node)
delete(a.fetchNodes, spiffeId)
}
return nil
}

// fetchNodesPage gets the range for the page starting at pageStart
func (a *attestedNodes) fetchNodesPage(spiffeIds []string, pageStart int) []string {
pageEnd := min(len(spiffeIds), pageStart+int(a.pageSize))
return spiffeIds[pageStart:pageEnd]
}

func (a *attestedNodes) emitMetrics() {
if a.skippedNodeEvents != a.eventTracker.EventCount() {
a.skippedNodeEvents = a.eventTracker.EventCount()
Expand Down
Loading
Loading