Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 22 additions & 64 deletions tests/kafkatest/tests/streams/streams_broker_bounce_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,40 @@
from ducktape.tests.test import Test
from ducktape.mark.resource import cluster
from ducktape.mark import matrix
from ducktape.mark import ignore
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
import time
import signal
from random import randint

def broker_node(test, topic, broker_type):
""" Discover node of requested type. For leader type, discovers leader for our topic and partition 0
"""
if broker_type == "leader":
node = test.kafka.leader(topic, partition=0)
elif broker_type == "controller":
node = test.kafka.controller()
else:
raise Exception("Unexpected broker type %s." % (broker_type))

return node
def leader_node(test, topic):
"""Discover leader for our topic and partition 0."""
return test.kafka.leader(topic, partition=0)

def signal_node(test, node, sig):
test.kafka.signal_node(node, sig)

def clean_shutdown(test, topic, broker_type):
"""Discover broker node of requested type and shut it down cleanly.
"""
node = broker_node(test, topic, broker_type)
def clean_shutdown(test, topic):
"""Discover leader broker node and shut it down cleanly."""
node = leader_node(test, topic)
signal_node(test, node, signal.SIGTERM)

def hard_shutdown(test, topic, broker_type):
"""Discover broker node of requested type and shut it down with a hard kill."""
node = broker_node(test, topic, broker_type)
def hard_shutdown(test, topic):
"""Discover leader broker node and shut it down with a hard kill."""
node = leader_node(test, topic)
signal_node(test, node, signal.SIGKILL)

def clean_bounce(test, topic, broker_type):
def clean_bounce(test, topic):
"""Chase the leader of one partition and restart it cleanly a few times (5 times)."""
for i in range(5):
prev_broker_node = broker_node(test, topic, broker_type)
prev_broker_node = leader_node(test, topic)
test.kafka.restart_node(prev_broker_node, clean_shutdown=True)


def hard_bounce(test, topic, broker_type):
def hard_bounce(test, topic):
"""Chase the leader and restart it with a hard kill. Do this a few times (5)."""
for i in range(5):
prev_broker_node = broker_node(test, topic, broker_type)
prev_broker_node = leader_node(test, topic)
test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)

wait_until(lambda: not test.kafka.pids(prev_broker_node),
Expand Down Expand Up @@ -145,11 +135,11 @@ def __init__(self, test_context):
'configs': {"min.insync.replicas": 2} }
}

def fail_broker_type(self, failure_mode, broker_type):
def fail_leader(self, failure_mode):
# Pick a random topic and bounce it's leader
topic_index = randint(0, len(self.topics.keys()) - 1)
topic = list(self.topics.keys())[topic_index]
failures[failure_mode](self, topic, broker_type)
failures[failure_mode](self, topic)

def fail_many_brokers(self, failure_mode, num_failures):
many_failures[failure_mode](self, num_failures)
Expand Down Expand Up @@ -194,7 +184,7 @@ def setup_system(self, start_processor=True, num_threads=3, group_protocol='clas
if (start_processor):
self.processor1.start()

def collect_results(self, sleep_time_secs):
def collect_results(self):
data = {}
# End test
self.driver.wait()
Expand All @@ -204,13 +194,7 @@ def collect_results(self, sleep_time_secs):

node = self.driver.node

# Success is declared if streams does not crash when sleep time > 0
# It should give an exception when sleep time is 0 since we kill the brokers immediately
# and the topic manager cannot create internal topics with the desired replication factor
if (sleep_time_secs == 0):
output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-EXCEPTION %s" % self.processor1.STDOUT_FILE, allow_fail=False)
else:
output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
output_streams = self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)

for line in output_streams:
data["Client closed"] = line
Expand All @@ -229,12 +213,11 @@ def collect_results(self, sleep_time_secs):

@cluster(num_nodes=7)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
broker_type=["leader"],
num_threads=[1, 3],
sleep_time_secs=[120],
metadata_quorum=[quorum.combined_kraft],
group_protocol=["classic", "streams"])
def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum, group_protocol):
def test_broker_type_bounce(self, failure_mode, sleep_time_secs, num_threads, metadata_quorum, group_protocol):
"""
Start a smoke test client, then kill one particular broker and ensure data is still received
Record if records are delivered.
Expand All @@ -247,34 +230,9 @@ def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, nu
time.sleep(sleep_time_secs)

# Fail brokers
self.fail_broker_type(failure_mode, broker_type)

return self.collect_results(sleep_time_secs)

@ignore
@cluster(num_nodes=7)
@matrix(failure_mode=["clean_shutdown"],
broker_type=["controller"],
sleep_time_secs=[0],
metadata_quorum=[quorum.combined_kraft],
group_protocol=["classic", "streams"])
def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs, metadata_quorum, group_protocol):
"""
Start a smoke test client, then kill one particular broker immediately before streams stats
Streams should throw an exception since it cannot create topics with the desired
replication factor of 3
"""
self.setup_system(start_processor=False, group_protocol=group_protocol)

# Sleep to allow test to run for a bit
time.sleep(sleep_time_secs)

# Fail brokers
self.fail_broker_type(failure_mode, broker_type)

self.processor1.start()
self.fail_leader(failure_mode)

return self.collect_results(sleep_time_secs)
return self.collect_results()

@cluster(num_nodes=10)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
Expand All @@ -294,7 +252,7 @@ def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum,
# Fail brokers
self.fail_many_brokers(failure_mode, num_failures)

return self.collect_results(120)
return self.collect_results()

@cluster(num_nodes=10)
@matrix(failure_mode=["clean_bounce", "hard_bounce"],
Expand All @@ -321,4 +279,4 @@ def test_all_brokers_bounce(self, failure_mode, num_failures, metadata_quorum, g
# Fail brokers
self.fail_many_brokers(failure_mode, num_failures)

return self.collect_results(120)
return self.collect_results()
Loading