Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/out_kinesis_streams/kinesis_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#ifndef FLB_OUT_KINESIS_API
#define FLB_OUT_KINESIS_API

#define PUT_RECORDS_PAYLOAD_SIZE 5242880
#define PUT_RECORDS_PAYLOAD_SIZE 10485760
#define MAX_EVENTS_PER_PUT 500
#define MAX_EVENT_SIZE 1048556 /* 1048576 - 20 bytes for partition key */
#define MAX_B64_EVENT_SIZE 1398076 /* ceil(1048556 / 3) * 4 */
Expand Down
159 changes: 159 additions & 0 deletions tests/runtime/out_kinesis.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
/* Test data */
#include "data/td/json_td.h" /* JSON_TD */

/* Kinesis Streams API constants */
#include "../../plugins/out_kinesis_streams/kinesis_api.h"

#define ERROR_THROUGHPUT "{\"__type\":\"ServiceUnavailableException\"}"
/* not a real error code, but tests that the code can respond to any error */
#define ERROR_UNKNOWN "{\"__type\":\"UNKNOWN\"}"
Expand Down Expand Up @@ -595,6 +598,159 @@ void flb_test_kinesis_compression_snappy_with_aggregation(void)
flb_destroy(ctx);
}

/*
* Helper function to create a log-event record in [timestamp, {"message":"..."}]
* format with a message payload of the specified target_json_size.
* target_json_size is the desired size of the inner JSON object {"message":"..."}.
* Returns a malloc'd string the caller must free, and sets *out_len.
*/
static char* create_large_log_event(size_t target_json_size, size_t *out_len)
{
const char *prefix = "[1, {\"message\":\"";
const char *suffix = "\"}]";
size_t prefix_len = strlen(prefix);
size_t suffix_len = strlen(suffix);
size_t json_prefix_len = strlen("{\"message\":\"");
size_t json_suffix_len = strlen("\"}");
size_t json_overhead = json_prefix_len + json_suffix_len;
size_t fill_size;
size_t total_len;
char *record;

if (target_json_size < json_overhead + 1) {
return NULL;
}

fill_size = target_json_size - json_overhead;
total_len = prefix_len + fill_size + suffix_len;

record = flb_malloc(total_len + 1);
if (!record) {
return NULL;
}

memcpy(record, prefix, prefix_len);
memset(record + prefix_len, 'A', fill_size);
memcpy(record + prefix_len + fill_size, suffix, suffix_len);
record[total_len] = '\0';

*out_len = total_len;
return record;
}

/* Helper to setup and run a Kinesis Streams test with custom record data */
static void run_kinesis_test_with_data(char *data, size_t data_len)
{
int ret;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

setenv("FLB_KINESIS_PLUGIN_UNDER_TEST", "true", 1);

ctx = flb_create();
TEST_CHECK(ctx != NULL);

in_ffd = flb_input(ctx, (char *) "lib", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd, "tag", "test", NULL);

out_ffd = flb_output(ctx, (char *) "kinesis_streams", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd, "match", "*", NULL);
flb_output_set(ctx, out_ffd, "region", "us-west-2", NULL);
flb_output_set(ctx, out_ffd, "stream", "fluent", NULL);
flb_output_set(ctx, out_ffd, "Retry_Limit", "1", NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0);

if (data) {
flb_lib_push(ctx, in_ffd, data, data_len);
}

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

/*
* Test event size just under the maximum allowed limit (should succeed).
* MAX_EVENT_SIZE is 1048556 (1 MiB - 20 bytes for partition key).
* The plugin discards when (written + 1) >= MAX_EVENT_SIZE, so the maximum
* accepted JSON size is MAX_EVENT_SIZE - 2.
*/
void flb_test_kinesis_event_size_at_limit(void)
{
char *record;
size_t record_len;

record = create_large_log_event(MAX_EVENT_SIZE - 2, &record_len);
TEST_CHECK(record != NULL);

if (record) {
run_kinesis_test_with_data(record, record_len);
flb_free(record);
}
}

/*
* Test event size exceeding limit (should be discarded by the plugin).
* The plugin logs a warning and drops the record without crashing.
*/
void flb_test_kinesis_event_size_over_limit(void)
{
char *record;
size_t record_len;

record = create_large_log_event(MAX_EVENT_SIZE + 100, &record_len);
TEST_CHECK(record != NULL);

if (record) {
run_kinesis_test_with_data(record, record_len);
flb_free(record);
}
}

/*
* Test event with backslash escape sequences near the size boundary.
* Validates the plugin handles special characters correctly when the
* record exceeds the limit.
*/
void flb_test_kinesis_event_size_with_backslash(void)
{
char *record;
size_t record_len;
size_t prefix_len = strlen("[1, {\"message\":\"");
size_t suffix_len = strlen("\"}]");
size_t fill_size;
size_t boundary;
size_t i;

record = create_large_log_event(MAX_EVENT_SIZE + 100, &record_len);
TEST_CHECK(record != NULL);

if (record) {
fill_size = record_len - prefix_len - suffix_len;

/* Replace pairs of characters with valid escape sequence "\\" */
for (i = 98; i < fill_size - 1; i += 100) {
record[prefix_len + i] = '\\';
record[prefix_len + i + 1] = '\\';
}

/* Ensure a backslash pair is near the MAX_EVENT_SIZE boundary */
boundary = MAX_EVENT_SIZE - 1;
if (boundary + 1 < record_len - suffix_len) {
record[boundary] = '\\';
record[boundary + 1] = '\\';
}

run_kinesis_test_with_data(record, record_len);
flb_free(record);
}
}

/* Test list */
TEST_LIST = {
{"success", flb_test_firehose_success },
Expand All @@ -613,5 +769,8 @@ TEST_LIST = {
{"compression_zstd", flb_test_kinesis_compression_zstd },
{"compression_snappy", flb_test_kinesis_compression_snappy },
{"compression_snappy_with_aggregation", flb_test_kinesis_compression_snappy_with_aggregation },
{"event_size_at_limit", flb_test_kinesis_event_size_at_limit },
{"event_size_over_limit", flb_test_kinesis_event_size_over_limit },
{"event_size_with_backslash", flb_test_kinesis_event_size_with_backslash },
{NULL, NULL}
};
Loading