diff --git a/plugins/out_loki/loki.c b/plugins/out_loki/loki.c index d24c1531ef3..928f8bba927 100644 --- a/plugins/out_loki/loki.c +++ b/plugins/out_loki/loki.c @@ -35,15 +35,18 @@ #include "loki.h" -struct flb_loki_dynamic_tenant_id_entry { - flb_sds_t value; - struct cfl_list _head; +struct flb_loki_tenant_group { + flb_sds_t tenant_id; + int records; + struct mk_list _head; }; +#define FLB_LOKI_TENANT_GROUP_FLUSH_SUCCESS (((uint64_t) 1) << 0) +#define FLB_LOKI_TENANT_GROUP_FLUSH_ERROR (((uint64_t) 1) << 1) +#define FLB_LOKI_TENANT_GROUP_FLUSH_RETRY (((uint64_t) 1) << 2) + pthread_once_t initialization_guard = PTHREAD_ONCE_INIT; -FLB_TLS_DEFINE(struct flb_loki_dynamic_tenant_id_entry, - thread_local_tenant_id); struct flb_loki_remove_mpa_entry { struct flb_mp_accessor *mpa; struct cfl_list _head; @@ -52,41 +55,9 @@ FLB_TLS_DEFINE(struct flb_loki_remove_mpa_entry, thread_local_remove_mpa); void initialize_thread_local_storage() { - FLB_TLS_INIT(thread_local_tenant_id); FLB_TLS_INIT(thread_local_remove_mpa); } -static struct flb_loki_dynamic_tenant_id_entry *dynamic_tenant_id_create() { - struct flb_loki_dynamic_tenant_id_entry *entry; - - entry = (struct flb_loki_dynamic_tenant_id_entry *) \ - flb_calloc(1, sizeof(struct flb_loki_dynamic_tenant_id_entry)); - - if (entry != NULL) { - entry->value = NULL; - - cfl_list_entry_init(&entry->_head); - } - - return entry; -} - -static void dynamic_tenant_id_destroy(struct flb_loki_dynamic_tenant_id_entry *entry) { - if (entry != NULL) { - if (entry->value != NULL) { - flb_sds_destroy(entry->value); - - entry->value = NULL; - } - - if (!cfl_list_entry_is_orphan(&entry->_head)) { - cfl_list_del(&entry->_head); - } - - flb_free(entry); - } -} - static struct flb_loki_remove_mpa_entry *remove_mpa_entry_create(struct flb_loki *ctx) { struct flb_loki_remove_mpa_entry *entry; @@ -1228,6 +1199,22 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins, return NULL; } + /* tenant_id_key split request error handling */ + if (strcasecmp(ctx->tenant_id_key_error_handling, "partial_success") == 0) { + ctx->out_tenant_id_key_error_handling = + FLB_LOKI_TENANT_ID_KEY_ERROR_PARTIAL_SUCCESS; + } + else if (strcasecmp(ctx->tenant_id_key_error_handling, "partial_error") == 0) { + ctx->out_tenant_id_key_error_handling = + FLB_LOKI_TENANT_ID_KEY_ERROR_PARTIAL_ERROR; + } + else { + flb_plg_error(ctx->ins, + "invalid 'tenant_id_key_error_handling' value: %s", + ctx->tenant_id_key_error_handling); + return NULL; + } + /* use TLS ? */ if (ins->use_tls == FLB_TRUE) { io_flags = FLB_IO_TLS; @@ -1359,26 +1346,28 @@ static void pack_format_line_value(flb_sds_t *buf, msgpack_object *val) } } -// seek tenant id from map and set it to dynamic_tenant_id -static int get_tenant_id_from_record(struct flb_loki *ctx, msgpack_object *map, - flb_sds_t *dynamic_tenant_id) +static flb_sds_t get_tenant_id_from_record(struct flb_loki *ctx, msgpack_object *map, + int warn) { struct flb_ra_value *rval = NULL; flb_sds_t tmp_str; - int cmp_len; rval = flb_ra_get_value_object(ctx->ra_tenant_id_key, *map); if (rval == NULL) { - flb_plg_warn(ctx->ins, "the value of %s is missing", - ctx->tenant_id_key_config); - return -1; + if (warn == FLB_TRUE) { + flb_plg_warn(ctx->ins, "the value of %s is missing", + ctx->tenant_id_key_config); + } + return NULL; } else if (rval->o.type != MSGPACK_OBJECT_STR) { - flb_plg_warn(ctx->ins, "the value of %s is not string", - ctx->tenant_id_key_config); + if (warn == FLB_TRUE) { + flb_plg_warn(ctx->ins, "the value of %s is not string", + ctx->tenant_id_key_config); + } flb_ra_key_value_destroy(rval); - return -1; + return NULL; } tmp_str = flb_sds_create_len(rval->o.via.str.ptr, @@ -1386,39 +1375,56 @@ static int get_tenant_id_from_record(struct flb_loki *ctx, msgpack_object *map, if (tmp_str == NULL) { flb_plg_warn(ctx->ins, "cannot create tenant ID string from record"); flb_ra_key_value_destroy(rval); - return -1; + return NULL; } - // check if already dynamic_tenant_id is set. - if (*dynamic_tenant_id != NULL) { - cmp_len = flb_sds_len(*dynamic_tenant_id); + flb_ra_key_value_destroy(rval); + return tmp_str; +} - if ((rval->o.via.str.size == cmp_len) && - flb_sds_cmp(tmp_str, *dynamic_tenant_id, cmp_len) == 0) { - // tenant_id is same. nothing to do. - flb_ra_key_value_destroy(rval); - flb_sds_destroy(tmp_str); +static int tenant_id_matches(flb_sds_t left, flb_sds_t right) +{ + if (left == NULL && right == NULL) { + return FLB_TRUE; + } - return 0; - } + if (left == NULL || right == NULL) { + return FLB_FALSE; + } - flb_plg_warn(ctx->ins, "Tenant ID is overwritten %s -> %s", - *dynamic_tenant_id, tmp_str); + if (flb_sds_len(left) != flb_sds_len(right)) { + return FLB_FALSE; + } - flb_sds_destroy(*dynamic_tenant_id); + if (flb_sds_cmp(left, right, flb_sds_len(left)) == 0) { + return FLB_TRUE; } - // this sds will be released after setting http header. - *dynamic_tenant_id = tmp_str; - flb_plg_debug(ctx->ins, "Tenant ID is %s", *dynamic_tenant_id); + return FLB_FALSE; +} - flb_ra_key_value_destroy(rval); - return 0; +static flb_sds_t get_effective_tenant_id(struct flb_loki *ctx, + msgpack_object *record, + int warn) +{ + flb_sds_t tenant_id = NULL; + + if (ctx->ra_tenant_id_key && record->type == MSGPACK_OBJECT_MAP) { + tenant_id = get_tenant_id_from_record(ctx, record, warn); + } + + if (tenant_id == NULL && ctx->tenant_id != NULL) { + tenant_id = flb_sds_create_len(ctx->tenant_id, flb_sds_len(ctx->tenant_id)); + if (tenant_id == NULL) { + flb_errno(); + } + } + + return tenant_id; } static int pack_record(struct flb_loki *ctx, msgpack_packer *mp_pck, msgpack_object *rec, - flb_sds_t *dynamic_tenant_id, struct flb_mp_accessor *remove_mpa, struct flb_config *config) { @@ -1436,14 +1442,6 @@ static int pack_record(struct flb_loki *ctx, msgpack_unpacked mp_buffer; size_t off = 0; - /* - * Get tenant id from record before removing keys. - * https://github.com/fluent/fluent-bit/issues/6207 - */ - if (ctx->ra_tenant_id_key && rec->type == MSGPACK_OBJECT_MAP) { - get_tenant_id_from_record(ctx, rec, dynamic_tenant_id); - } - /* Remove keys in remove_keys */ msgpack_unpacked_init(&mp_buffer); if (remove_mpa) { @@ -1587,18 +1585,6 @@ static int cb_loki_init(struct flb_output_instance *ins, return -1; } - result = pthread_mutex_init(&ctx->dynamic_tenant_list_lock, NULL); - - if (result != 0) { - flb_errno(); - - flb_plg_error(ins, "cannot initialize dynamic tenant id list lock"); - - loki_config_destroy(ctx); - - return -1; - } - result = pthread_once(&initialization_guard, initialize_thread_local_storage); @@ -1612,7 +1598,6 @@ static int cb_loki_init(struct flb_output_instance *ins, return -1; } - cfl_list_init(&ctx->dynamic_tenant_list); result = pthread_mutex_init(&ctx->remove_mpa_list_lock, NULL); if (result != 0) { flb_errno(); @@ -1639,7 +1624,8 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, int total_records, char *tag, int tag_len, const void *data, size_t bytes, - flb_sds_t *dynamic_tenant_id, + flb_sds_t tenant_filter, + int filter_tenant, struct flb_mp_accessor *remove_mpa, struct flb_config *config) { @@ -1653,6 +1639,8 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, // msgpack_object *obj; struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; + flb_sds_t record_tenant_id; + int pack; int ret; /* @@ -1727,12 +1715,25 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + pack = FLB_TRUE; + if (filter_tenant == FLB_TRUE) { + record_tenant_id = get_effective_tenant_id(ctx, log_event.body, FLB_FALSE); + pack = tenant_id_matches(record_tenant_id, tenant_filter); + if (record_tenant_id) { + flb_sds_destroy(record_tenant_id); + } + } + + if (pack == FLB_FALSE) { + continue; + } + msgpack_pack_array(&mp_pck, ctx->structured_metadata || ctx->structured_metadata_map_keys ? 3 : 2); /* Append the timestamp */ pack_timestamp(&mp_pck, &log_event.timestamp); - pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id, remove_mpa, config); + pack_record(ctx, &mp_pck, log_event.body, remove_mpa, config); if (ctx->structured_metadata || ctx->structured_metadata_map_keys) { pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL, config); } @@ -1749,6 +1750,19 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, while ((ret = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + pack = FLB_TRUE; + if (filter_tenant == FLB_TRUE) { + record_tenant_id = get_effective_tenant_id(ctx, log_event.body, FLB_FALSE); + pack = tenant_id_matches(record_tenant_id, tenant_filter); + if (record_tenant_id) { + flb_sds_destroy(record_tenant_id); + } + } + + if (pack == FLB_FALSE) { + continue; + } + /* map content: streams['stream'] & streams['values'] */ msgpack_pack_map(&mp_pck, 2); @@ -1769,7 +1783,7 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx, /* Append the timestamp */ pack_timestamp(&mp_pck, &log_event.timestamp); - pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id, remove_mpa, config); + pack_record(ctx, &mp_pck, log_event.body, remove_mpa, config); if (ctx->structured_metadata || ctx->structured_metadata_map_keys) { pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body, config); } @@ -1796,92 +1810,119 @@ static void payload_release(void *payload, int compressed) } } -static void cb_loki_flush(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *i_ins, - void *out_context, - struct flb_config *config) +static struct flb_loki_tenant_group *tenant_group_get(struct mk_list *groups, + flb_sds_t tenant_id) { - int ret; - int out_ret = FLB_OK; - size_t b_sent; - flb_sds_t payload = NULL; - flb_sds_t out_buf = NULL; - size_t out_size; - int compressed = FLB_FALSE; - struct flb_loki *ctx = out_context; - struct flb_connection *u_conn; - struct flb_http_client *c; - struct flb_loki_dynamic_tenant_id_entry *dynamic_tenant_id; - struct flb_loki_remove_mpa_entry *remove_mpa_entry; struct mk_list *head; - struct flb_config_map_val *mv; - struct flb_slist_entry *key = NULL; - struct flb_slist_entry *val = NULL; + struct flb_loki_tenant_group *group; - dynamic_tenant_id = FLB_TLS_GET(thread_local_tenant_id); - - remove_mpa_entry = FLB_TLS_GET(thread_local_remove_mpa); - - if (remove_mpa_entry == NULL) { - remove_mpa_entry = remove_mpa_entry_create(ctx); - if (!remove_mpa_entry) { - flb_plg_error(ctx->ins, "cannot allocate remove_mpa entry"); - FLB_OUTPUT_RETURN(FLB_RETRY); + mk_list_foreach(head, groups) { + group = mk_list_entry(head, struct flb_loki_tenant_group, _head); + if (tenant_id_matches(group->tenant_id, tenant_id) == FLB_TRUE) { + return group; } + } - FLB_TLS_SET(thread_local_remove_mpa, remove_mpa_entry); + return NULL; +} - pthread_mutex_lock(&ctx->remove_mpa_list_lock); - cfl_list_add(&remove_mpa_entry->_head, &ctx->remove_mpa_list); - pthread_mutex_unlock(&ctx->remove_mpa_list_lock); +static void tenant_groups_destroy(struct mk_list *groups) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_loki_tenant_group *group; + + mk_list_foreach_safe(head, tmp, groups) { + group = mk_list_entry(head, struct flb_loki_tenant_group, _head); + mk_list_del(&group->_head); + if (group->tenant_id) { + flb_sds_destroy(group->tenant_id); + } + flb_free(group); } +} - if (dynamic_tenant_id == NULL) { - dynamic_tenant_id = dynamic_tenant_id_create(); +static int collect_tenant_groups(struct flb_loki *ctx, + const void *data, size_t bytes, + struct mk_list *groups) +{ + int ret; + flb_sds_t tenant_id; + struct flb_loki_tenant_group *group; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; - if (dynamic_tenant_id == NULL) { - flb_errno(); - flb_plg_error(ctx->ins, "cannot allocate dynamic tenant id"); + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event decoder initialization error : %d", ret); - FLB_OUTPUT_RETURN(FLB_RETRY); - } + return -1; + } - FLB_TLS_SET(thread_local_tenant_id, dynamic_tenant_id); + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + tenant_id = get_effective_tenant_id(ctx, log_event.body, FLB_TRUE); + group = tenant_group_get(groups, tenant_id); + if (group == NULL) { + group = flb_calloc(1, sizeof(struct flb_loki_tenant_group)); + if (group == NULL) { + if (tenant_id) { + flb_sds_destroy(tenant_id); + } + flb_log_event_decoder_destroy(&log_decoder); + flb_errno(); - pthread_mutex_lock(&ctx->dynamic_tenant_list_lock); + return -1; + } - cfl_list_add(&dynamic_tenant_id->_head, &ctx->dynamic_tenant_list); + group->tenant_id = tenant_id; + group->records = 0; + mk_list_add(&group->_head, groups); + } + else if (tenant_id) { + flb_sds_destroy(tenant_id); + } - pthread_mutex_unlock(&ctx->dynamic_tenant_list_lock); + group->records++; } - /* Format the data to the expected Newrelic Payload */ - payload = loki_compose_payload(ctx, - event_chunk->total_events, - (char *) event_chunk->tag, - flb_sds_len(event_chunk->tag), - event_chunk->data, event_chunk->size, - &dynamic_tenant_id->value, - remove_mpa_entry->mpa, - config); + flb_log_event_decoder_destroy(&log_decoder); - if (!payload) { - flb_plg_error(ctx->ins, "cannot compose request payload"); + return 0; +} - FLB_OUTPUT_RETURN(FLB_RETRY); - } +static int send_loki_payload(struct flb_loki *ctx, + flb_sds_t payload, + flb_sds_t tenant_id, + struct flb_config *config) +{ + int ret; + int out_ret = FLB_OK; + int compressed = FLB_FALSE; + size_t b_sent; + size_t out_size; + flb_sds_t out_buf; + struct flb_connection *u_conn; + struct flb_http_client *c; + struct mk_list *head; + struct flb_config_map_val *mv; + struct flb_slist_entry *key = NULL; + struct flb_slist_entry *val = NULL; /* Map buffer */ out_buf = payload; out_size = flb_sds_len(payload); if (ctx->compress_gzip == FLB_TRUE) { - ret = flb_gzip_compress((void *) payload, flb_sds_len(payload), (void **) &out_buf, &out_size); + ret = flb_gzip_compress((void *) payload, flb_sds_len(payload), + (void **) &out_buf, &out_size); if (ret == -1) { flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression"); - } else { + } + else { compressed = FLB_TRUE; /* payload is not longer needed */ flb_sds_destroy(payload); @@ -1895,7 +1936,7 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, payload_release(out_buf, compressed); - FLB_OUTPUT_RETURN(FLB_RETRY); + return FLB_RETRY; } /* Create HTTP client context */ @@ -1905,11 +1946,10 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, NULL, 0); if (!c) { flb_plg_error(ctx->ins, "cannot create HTTP client context"); - payload_release(out_buf, compressed); flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(FLB_RETRY); + return FLB_RETRY; } /* Set response buffer size */ @@ -1924,7 +1964,8 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, /* Auth headers */ if (ctx->http_user && ctx->http_passwd) { /* Basic */ flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); - } else if (ctx->bearer_token) { /* Bearer token */ + } + else if (ctx->bearer_token) { flb_http_bearer_auth(c, ctx->bearer_token); } @@ -1947,17 +1988,11 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, flb_http_set_content_encoding_gzip(c); } - /* Add X-Scope-OrgID header */ - if (dynamic_tenant_id->value != NULL) { + if (tenant_id != NULL) { flb_http_add_header(c, FLB_LOKI_HEADER_SCOPE, sizeof(FLB_LOKI_HEADER_SCOPE) - 1, - dynamic_tenant_id->value, - flb_sds_len(dynamic_tenant_id->value)); - } - else if (ctx->tenant_id) { - flb_http_add_header(c, - FLB_LOKI_HEADER_SCOPE, sizeof(FLB_LOKI_HEADER_SCOPE) - 1, - ctx->tenant_id, flb_sds_len(ctx->tenant_id)); + tenant_id, + flb_sds_len(tenant_id)); } /* Send HTTP request */ @@ -1990,15 +2025,15 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, else if (c->resp.status >= 500 && c->resp.status <= 599) { if (c->resp.payload) { flb_plg_error(ctx->ins, "could not flush records to %s:%i" - " HTTP status=%i", - ctx->tcp_host, ctx->tcp_port, c->resp.status); + " HTTP status=%i", + ctx->tcp_host, ctx->tcp_port, c->resp.status); flb_plg_trace(ctx->ins, "Response was:\n%s", - c->resp.payload); + c->resp.payload); } else { flb_plg_error(ctx->ins, "could not flush records to %s:%i" - " HTTP status=%i", - ctx->tcp_host, ctx->tcp_port, c->resp.status); + " HTTP status=%i", + ctx->tcp_host, ctx->tcp_port, c->resp.status); } /* * Server-side error occurred, do not reuse this connection for retry. @@ -2044,22 +2079,139 @@ static void cb_loki_flush(struct flb_event_chunk *event_chunk, flb_http_client_destroy(c); flb_upstream_conn_release(u_conn); - FLB_OUTPUT_RETURN(out_ret); + return out_ret; } -static void release_dynamic_tenant_ids(struct cfl_list *dynamic_tenant_list) +static uint64_t tenant_group_flush_status(int ret) { - struct cfl_list *iterator; - struct cfl_list *backup; - struct flb_loki_dynamic_tenant_id_entry *entry; + if (ret == FLB_OK) { + return FLB_LOKI_TENANT_GROUP_FLUSH_SUCCESS; + } + else if (ret == FLB_RETRY) { + return FLB_LOKI_TENANT_GROUP_FLUSH_RETRY; + } - cfl_list_foreach_safe(iterator, backup, dynamic_tenant_list) { - entry = cfl_list_entry(iterator, - struct flb_loki_dynamic_tenant_id_entry, - _head); + return FLB_LOKI_TENANT_GROUP_FLUSH_ERROR; +} + +static int tenant_group_flush_result(struct flb_loki *ctx, uint64_t status) +{ + if (status == 0) { + return FLB_OK; + } + + if ((status & FLB_LOKI_TENANT_GROUP_FLUSH_SUCCESS) == 0) { + if (status & FLB_LOKI_TENANT_GROUP_FLUSH_RETRY) { + return FLB_RETRY; + } + + return FLB_ERROR; + } + + if ((status & FLB_LOKI_TENANT_GROUP_FLUSH_RETRY) == 0 && + (status & FLB_LOKI_TENANT_GROUP_FLUSH_ERROR) == 0) { + return FLB_OK; + } + + if (ctx->out_tenant_id_key_error_handling == + FLB_LOKI_TENANT_ID_KEY_ERROR_PARTIAL_SUCCESS) { + return FLB_OK; + } - dynamic_tenant_id_destroy(entry); + return FLB_RETRY; +} + +static void cb_loki_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *i_ins, + void *out_context, + struct flb_config *config) +{ + int ret; + int out_ret = FLB_OK; + uint64_t tenant_group_status = 0; + flb_sds_t payload = NULL; + struct flb_loki *ctx = out_context; + struct flb_loki_remove_mpa_entry *remove_mpa_entry; + struct mk_list tenant_groups; + struct mk_list *head; + struct flb_loki_tenant_group *group; + + remove_mpa_entry = FLB_TLS_GET(thread_local_remove_mpa); + + if (remove_mpa_entry == NULL) { + remove_mpa_entry = remove_mpa_entry_create(ctx); + if (!remove_mpa_entry) { + flb_plg_error(ctx->ins, "cannot allocate remove_mpa entry"); + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + FLB_TLS_SET(thread_local_remove_mpa, remove_mpa_entry); + + pthread_mutex_lock(&ctx->remove_mpa_list_lock); + cfl_list_add(&remove_mpa_entry->_head, &ctx->remove_mpa_list); + pthread_mutex_unlock(&ctx->remove_mpa_list_lock); + } + + if (ctx->ra_tenant_id_key == NULL) { + payload = loki_compose_payload(ctx, + event_chunk->total_events, + (char *) event_chunk->tag, + flb_sds_len(event_chunk->tag), + event_chunk->data, event_chunk->size, + NULL, + FLB_FALSE, + remove_mpa_entry->mpa, + config); + + if (!payload) { + flb_plg_error(ctx->ins, "cannot compose request payload"); + + FLB_OUTPUT_RETURN(FLB_RETRY); + } + + out_ret = send_loki_payload(ctx, payload, ctx->tenant_id, config); + + FLB_OUTPUT_RETURN(out_ret); + } + + mk_list_init(&tenant_groups); + + ret = collect_tenant_groups(ctx, event_chunk->data, event_chunk->size, &tenant_groups); + if (ret != 0) { + tenant_groups_destroy(&tenant_groups); + + FLB_OUTPUT_RETURN(FLB_RETRY); } + + mk_list_foreach(head, &tenant_groups) { + group = mk_list_entry(head, struct flb_loki_tenant_group, _head); + + payload = loki_compose_payload(ctx, + group->records, + (char *) event_chunk->tag, + flb_sds_len(event_chunk->tag), + event_chunk->data, event_chunk->size, + group->tenant_id, + FLB_TRUE, + remove_mpa_entry->mpa, + config); + + if (!payload) { + flb_plg_error(ctx->ins, "cannot compose request payload"); + tenant_group_status |= FLB_LOKI_TENANT_GROUP_FLUSH_RETRY; + continue; + } + + ret = send_loki_payload(ctx, payload, group->tenant_id, config); + tenant_group_status |= tenant_group_flush_status(ret); + } + + out_ret = tenant_group_flush_result(ctx, tenant_group_status); + + tenant_groups_destroy(&tenant_groups); + + FLB_OUTPUT_RETURN(out_ret); } static void release_remove_mpa_entries(struct cfl_list *remove_mpa_list) @@ -2085,12 +2237,6 @@ static int cb_loki_exit(void *data, struct flb_config *config) return 0; } - pthread_mutex_lock(&ctx->dynamic_tenant_list_lock); - - release_dynamic_tenant_ids(&ctx->dynamic_tenant_list); - - pthread_mutex_unlock(&ctx->dynamic_tenant_list_lock); - pthread_mutex_lock(&ctx->remove_mpa_list_lock); release_remove_mpa_entries(&ctx->remove_mpa_list); @@ -2125,6 +2271,14 @@ static struct flb_config_map config_map[] = { "It is useful to set X-Scode-OrgID dynamically." }, + { + FLB_CONFIG_MAP_STR, "tenant_id_key_error_handling", "partial_success", + 0, FLB_TRUE, offsetof(struct flb_loki, tenant_id_key_error_handling), + "Set how tenant_id_key split request failures affect the whole chunk. " + "Options are 'partial_success' to treat mixed success and failure as " + "success, or 'partial_error' to retry mixed success and failure." + }, + { FLB_CONFIG_MAP_CLIST, "labels", NULL, 0, FLB_TRUE, offsetof(struct flb_loki, labels), @@ -2238,24 +2392,18 @@ static int cb_loki_format_test(struct flb_config *config, { int total_records; flb_sds_t payload = NULL; - flb_sds_t dynamic_tenant_id; struct flb_loki *ctx = plugin_context; - dynamic_tenant_id = NULL; - /* Count number of records */ total_records = flb_mp_count_log_records(data, bytes); payload = loki_compose_payload(ctx, total_records, (char *) tag, tag_len, data, bytes, - &dynamic_tenant_id, + NULL, + FLB_FALSE, ctx->remove_mpa, config); if (payload == NULL) { - if (dynamic_tenant_id != NULL) { - flb_sds_destroy(dynamic_tenant_id); - } - return -1; } diff --git a/plugins/out_loki/loki.h b/plugins/out_loki/loki.h index 312e11a5ac1..b0d7b831346 100644 --- a/plugins/out_loki/loki.h +++ b/plugins/out_loki/loki.h @@ -46,6 +46,10 @@ #define FLB_LOKI_DROP_SINGLE_KEY_ON (((uint64_t) 1) << 1) #define FLB_LOKI_DROP_SINGLE_KEY_RAW (((uint64_t) 1) << 2) +/* tenant_id_key split request error handling */ +#define FLB_LOKI_TENANT_ID_KEY_ERROR_PARTIAL_SUCCESS 0 +#define FLB_LOKI_TENANT_ID_KEY_ERROR_PARTIAL_ERROR 1 + struct flb_loki_kv { int val_type; /* FLB_LOKI_KV_STR or FLB_LOKI_KV_RA */ flb_sds_t key; /* string key */ @@ -65,6 +69,7 @@ struct flb_loki { flb_sds_t line_format; flb_sds_t tenant_id; flb_sds_t tenant_id_key_config; + flb_sds_t tenant_id_key_error_handling; int compress_gzip; /* HTTP Auth */ @@ -88,6 +93,7 @@ struct flb_loki { char *tcp_host; int out_line_format; int out_drop_single_key; + int out_tenant_id_key_error_handling; int ra_used; /* number of record accessor label keys */ struct flb_record_accessor *ra_k8s; /* kubernetes record accessor */ struct mk_list labels_list; /* list of flb_loki_kv nodes */ @@ -97,9 +103,6 @@ struct flb_loki { struct flb_mp_accessor *remove_mpa; /* remove_keys multi-pattern accessor */ struct flb_record_accessor *ra_tenant_id_key; /* dynamic tenant id key */ - struct cfl_list dynamic_tenant_list; - pthread_mutex_t dynamic_tenant_list_lock; - struct cfl_list remove_mpa_list; pthread_mutex_t remove_mpa_list_lock; diff --git a/tests/runtime/out_loki.c b/tests/runtime/out_loki.c index e177b01e675..d5885b72302 100644 --- a/tests/runtime/out_loki.c +++ b/tests/runtime/out_loki.c @@ -20,12 +20,51 @@ #include #include +#include +#include +#include +#include +#include #include "flb_tests_runtime.h" +#include + #define DPATH_LOKI FLB_TESTS_DATA_PATH "/data/loki" +#define LOKI_TENANT_POLICY_HOST "127.0.0.1" +#define LOKI_TENANT_SPLIT_PORT "18083" +#define LOKI_TENANT_POLICY_SUCCESS_PORT "18084" +#define LOKI_TENANT_POLICY_ERROR_PORT "18085" pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; int num_output = 0; +static flb_sds_t tenant_headers[4]; +static flb_sds_t tenant_payloads[4]; +static int tenant_request_count = 0; +static int tenant_policy_request_count = 0; +static int tenant_policy_a_count = 0; +static int tenant_policy_b_count = 0; +static int tenant_policy_fail_tenant_a = FLB_FALSE; + +struct tenant_policy_server { + struct flb_http_server server; + struct flb_net_setup net_setup; + struct mk_event_loop *event_loop; + pthread_t thread; + int thread_started; + int stop; +}; + +static int tenant_policy_server_should_stop(struct tenant_policy_server *mock) +{ + int stop; + + pthread_mutex_lock(&result_mutex); + stop = mock->stop; + pthread_mutex_unlock(&result_mutex); + + return stop; +} + static int get_output_num() { int ret; @@ -48,6 +87,240 @@ static void clear_output_num() set_output_num(0); } +static void clear_tenant_requests() +{ + int i; + + pthread_mutex_lock(&result_mutex); + for (i = 0; i < 4; i++) { + if (tenant_headers[i]) { + flb_sds_destroy(tenant_headers[i]); + tenant_headers[i] = NULL; + } + if (tenant_payloads[i]) { + flb_sds_destroy(tenant_payloads[i]); + tenant_payloads[i] = NULL; + } + } + tenant_request_count = 0; + pthread_mutex_unlock(&result_mutex); +} + +static int get_tenant_request_count() +{ + int ret; + + pthread_mutex_lock(&result_mutex); + ret = tenant_request_count; + pthread_mutex_unlock(&result_mutex); + + return ret; +} + +static void clear_tenant_policy_requests() +{ + pthread_mutex_lock(&result_mutex); + tenant_policy_request_count = 0; + tenant_policy_a_count = 0; + tenant_policy_b_count = 0; + tenant_policy_fail_tenant_a = FLB_FALSE; + pthread_mutex_unlock(&result_mutex); +} + +static int get_tenant_policy_request_count() +{ + int ret; + + pthread_mutex_lock(&result_mutex); + ret = tenant_policy_request_count; + pthread_mutex_unlock(&result_mutex); + + return ret; +} + +static int get_tenant_policy_a_count() +{ + int ret; + + pthread_mutex_lock(&result_mutex); + ret = tenant_policy_a_count; + pthread_mutex_unlock(&result_mutex); + + return ret; +} + +static int get_tenant_policy_b_count() +{ + int ret; + + pthread_mutex_lock(&result_mutex); + ret = tenant_policy_b_count; + pthread_mutex_unlock(&result_mutex); + + return ret; +} + +static void set_tenant_policy_fail_tenant_a(int fail) +{ + pthread_mutex_lock(&result_mutex); + tenant_policy_fail_tenant_a = fail; + pthread_mutex_unlock(&result_mutex); +} + +static int cb_loki_tenant_policy_server(struct flb_http_request *request, + struct flb_http_response *response) +{ + int status = 200; + int slot; + int fail_tenant_a; + char *tenant; + + tenant = flb_http_request_get_header(request, "x-scope-orgid"); + + pthread_mutex_lock(&result_mutex); + slot = tenant_request_count; + if (slot < 4) { + if (tenant != NULL) { + tenant_headers[slot] = flb_sds_create("X-Scope-OrgID: "); + if (tenant_headers[slot] != NULL) { + tenant_headers[slot] = flb_sds_cat(tenant_headers[slot], + tenant, strlen(tenant)); + } + } + + if (request->body != NULL) { + tenant_payloads[slot] = flb_sds_create_len(request->body, + cfl_sds_len(request->body)); + } + tenant_request_count++; + } + + tenant_policy_request_count++; + fail_tenant_a = tenant_policy_fail_tenant_a; + if (tenant != NULL && strcmp(tenant, "tenant-a") == 0) { + tenant_policy_a_count++; + if (fail_tenant_a == FLB_TRUE) { + status = 400; + } + } + else if (tenant != NULL && strcmp(tenant, "tenant-b") == 0) { + tenant_policy_b_count++; + } + pthread_mutex_unlock(&result_mutex); + + if (status == 400) { + return flb_hs_response_send_string(response, status, + FLB_HS_CONTENT_TYPE_OTHER, + "bad tenant"); + } + + return flb_hs_response_send_string(response, status, + FLB_HS_CONTENT_TYPE_OTHER, + "ok"); +} + +static void *tenant_policy_server_loop(void *data) +{ + struct mk_event *event; + struct tenant_policy_server *mock = data; + + flb_engine_evl_set(mock->event_loop); + + while (tenant_policy_server_should_stop(mock) == FLB_FALSE) { + mk_event_wait_2(mock->event_loop, 100); + + mk_event_foreach(event, mock->event_loop) { + if (event->type == FLB_ENGINE_EV_CUSTOM) { + event->handler(event); + } + } + + if (mock->server.downstream != NULL) { + flb_downstream_conn_pending_destroy(mock->server.downstream); + } + } + + return NULL; +} + +static int start_tenant_policy_server(struct tenant_policy_server *mock, + const char *port, + struct flb_config *config) +{ + int ret; + struct flb_http_server_options options; + + memset(mock, 0, sizeof(struct tenant_policy_server)); + + flb_http_server_options_init(&options); + flb_net_setup_init(&mock->net_setup); + + mock->event_loop = mk_event_loop_create(256); + if (mock->event_loop == NULL) { + return -1; + } + + options.protocol_version = HTTP_PROTOCOL_VERSION_11; + options.request_callback = cb_loki_tenant_policy_server; + options.address = (char *) LOKI_TENANT_POLICY_HOST; + options.port = (unsigned short) atoi(port); + options.networking_flags = 0; + options.networking_setup = &mock->net_setup; + options.event_loop = mock->event_loop; + options.system_context = config; + options.use_caller_event_loop = FLB_TRUE; + + ret = flb_http_server_init_with_options(&mock->server, &options); + if (ret != 0) { + mk_event_loop_destroy(mock->event_loop); + mock->event_loop = NULL; + return -1; + } + + ret = flb_http_server_start(&mock->server); + if (ret != 0) { + flb_http_server_destroy(&mock->server); + mk_event_loop_destroy(mock->event_loop); + mock->event_loop = NULL; + return -1; + } + + mock->stop = FLB_FALSE; + mock->thread_started = FLB_FALSE; + ret = pthread_create(&mock->thread, NULL, tenant_policy_server_loop, mock); + if (ret != 0) { + flb_http_server_stop(&mock->server); + flb_http_server_destroy(&mock->server); + mk_event_loop_destroy(mock->event_loop); + mock->event_loop = NULL; + return -1; + } + mock->thread_started = FLB_TRUE; + + flb_time_msleep(500); + + return 0; +} + +static void stop_tenant_policy_server(struct tenant_policy_server *mock) +{ + if (mock->event_loop == NULL) { + return; + } + + pthread_mutex_lock(&result_mutex); + mock->stop = FLB_TRUE; + pthread_mutex_unlock(&result_mutex); + + if (mock->thread_started == FLB_TRUE) { + pthread_join(mock->thread, NULL); + } + flb_http_server_stop(&mock->server); + flb_http_server_destroy(&mock->server); + mk_event_loop_destroy(mock->event_loop); + mock->event_loop = NULL; +} + #define JSON_BASIC "[12345678, {\"key\":\"value\"}]" static void cb_check_basic(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, @@ -690,6 +963,231 @@ void flb_test_remove_keys_workers() flb_destroy(ctx); } +static int check_tenant_request(char *tenant, char *present, char *absent) +{ + int i; + + for (i = 0; i < tenant_request_count; i++) { + if (tenant_headers[i] == NULL || tenant_payloads[i] == NULL) { + continue; + } + + if (strstr(tenant_headers[i], tenant) == NULL) { + continue; + } + + if (!TEST_CHECK(strstr(tenant_payloads[i], present) != NULL)) { + TEST_MSG("payload for %s did not contain %s: %s", + tenant, present, tenant_payloads[i]); + return -1; + } + + if (!TEST_CHECK(strstr(tenant_payloads[i], absent) == NULL)) { + TEST_MSG("payload for %s contained %s: %s", + tenant, absent, tenant_payloads[i]); + return -1; + } + + return 0; + } + + TEST_CHECK(0); + TEST_MSG("no request found for tenant %s", tenant); + + return -1; +} + +void flb_test_tenant_id_key_splits_requests() +{ + int ret; + int tries; + int in_ffd; + int out_ffd; + flb_ctx_t *ctx; + struct tenant_policy_server mock_server; + char *tenant_a = "[12345678, {\"tenant_id\":\"tenant-a\",\"msg\":\"msg-a\"}]"; + char *tenant_b = "[12345679, {\"tenant_id\":\"tenant-b\",\"msg\":\"msg-b\"}]"; + + clear_tenant_requests(); + clear_tenant_policy_requests(); + + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "loki", NULL); + TEST_CHECK(out_ffd >= 0); + ret = flb_output_set(ctx, out_ffd, + "match", "test", + "host", LOKI_TENANT_POLICY_HOST, + "port", LOKI_TENANT_SPLIT_PORT, + "tenant_id_key", "tenant_id", + "remove_keys", "tenant_id", + "net.keepalive", "off", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + ret = start_tenant_policy_server(&mock_server, LOKI_TENANT_SPLIT_PORT, + ctx->config); + TEST_CHECK(ret == 0); + + ret = flb_lib_push(ctx, in_ffd, tenant_a, strlen(tenant_a)); + TEST_CHECK(ret >= 0); + ret = flb_lib_push(ctx, in_ffd, tenant_b, strlen(tenant_b)); + TEST_CHECK(ret >= 0); + + for (tries = 0; tries < 20 && get_tenant_request_count() < 2; tries++) { + flb_time_msleep(500); + } + + if (!TEST_CHECK(get_tenant_request_count() == 2)) { + TEST_MSG("expected 2 requests, got %d", get_tenant_request_count()); + } + + pthread_mutex_lock(&result_mutex); + check_tenant_request("X-Scope-OrgID: tenant-a", "msg-a", "msg-b"); + check_tenant_request("X-Scope-OrgID: tenant-b", "msg-b", "msg-a"); + pthread_mutex_unlock(&result_mutex); + + flb_stop(ctx); + stop_tenant_policy_server(&mock_server); + flb_destroy(ctx); + clear_tenant_requests(); + clear_tenant_policy_requests(); +} + +static void run_tenant_id_key_partial_handling(char *mode, + char *port, + int expect_retry) +{ + int ret; + int tries; + int in_ffd; + int out_ffd; + flb_ctx_t *ctx; + struct tenant_policy_server mock_server; + char *tenant_a = "[12345678, {\"tenant_id\":\"tenant-a\",\"msg\":\"msg-a\"}]"; + char *tenant_b = "[12345679, {\"tenant_id\":\"tenant-b\",\"msg\":\"msg-b\"}]"; + + clear_tenant_requests(); + clear_tenant_policy_requests(); + set_tenant_policy_fail_tenant_a(FLB_TRUE); + + ctx = flb_create(); + flb_service_set(ctx, "flush", "1", "grace", "1", + "log_level", "error", + "scheduler.base", "1", + "scheduler.cap", "1", + NULL); + + in_ffd = flb_input(ctx, (char *) "lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *) "loki", NULL); + TEST_CHECK(out_ffd >= 0); + ret = flb_output_set(ctx, out_ffd, + "match", "test", + "host", LOKI_TENANT_POLICY_HOST, + "port", port, + "tenant_id_key", "tenant_id", + "tenant_id_key_error_handling", mode, + "remove_keys", "tenant_id", + "Retry_Limit", "1", + "net.keepalive", "off", /* Prevent mock server 10s timeout races */ + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + ret = start_tenant_policy_server(&mock_server, port, ctx->config); + TEST_CHECK(ret == 0); + + ret = flb_lib_push(ctx, in_ffd, tenant_a, strlen(tenant_a)); + TEST_CHECK(ret >= 0); + ret = flb_lib_push(ctx, in_ffd, tenant_b, strlen(tenant_b)); + TEST_CHECK(ret >= 0); + + for (tries = 0; tries < 40 && get_tenant_policy_request_count() < 2; tries++) { + flb_time_msleep(500); + } + + if (!TEST_CHECK(get_tenant_policy_request_count() >= 2)) { + TEST_MSG("expected at least 2 requests, got %d", + get_tenant_policy_request_count()); + } + + if (!TEST_CHECK(get_tenant_policy_a_count() >= 1)) { + TEST_MSG("expected tenant-a request, got %d", + get_tenant_policy_a_count()); + } + + if (!TEST_CHECK(get_tenant_policy_b_count() >= 1)) { + TEST_MSG("expected tenant-b request after tenant-a failure, got %d", + get_tenant_policy_b_count()); + } + + if (expect_retry == FLB_TRUE) { + for (tries = 0; + tries < 40 && get_tenant_policy_request_count() < 4; + tries++) { + flb_time_msleep(500); + } + + if (!TEST_CHECK(get_tenant_policy_request_count() >= 4)) { + TEST_MSG("expected retry requests, got %d", + get_tenant_policy_request_count()); + } + + if (!TEST_CHECK(get_tenant_policy_a_count() >= 2)) { + TEST_MSG("expected tenant-a retry, got %d", + get_tenant_policy_a_count()); + } + + if (!TEST_CHECK(get_tenant_policy_b_count() >= 2)) { + TEST_MSG("expected tenant-b duplicate on retry, got %d", + get_tenant_policy_b_count()); + } + } + else { + flb_time_msleep(2500); /* > one retry window with scheduler.base/cap = 1 */ + + if (!TEST_CHECK(get_tenant_policy_request_count() == 2)) { + TEST_MSG("expected no retry requests, got %d", + get_tenant_policy_request_count()); + } + } + + flb_stop(ctx); + stop_tenant_policy_server(&mock_server); + flb_destroy(ctx); + clear_tenant_requests(); + clear_tenant_policy_requests(); +} + +void flb_test_tenant_id_key_partial_success() +{ + run_tenant_id_key_partial_handling("partial_success", + LOKI_TENANT_POLICY_SUCCESS_PORT, + FLB_FALSE); +} + +void flb_test_tenant_id_key_partial_error() +{ + run_tenant_id_key_partial_handling("partial_error", + LOKI_TENANT_POLICY_ERROR_PORT, + FLB_TRUE); +} + static void cb_check_label_map_path(void *ctx, int ffd, int res_ret, void *res_data, size_t res_size, void *data) @@ -1019,6 +1517,9 @@ TEST_LIST = { {"labels_ra" , flb_test_labels_ra }, {"remove_keys" , flb_test_remove_keys }, {"remove_keys_workers" , flb_test_remove_keys_workers }, + {"tenant_id_key_splits_requests", flb_test_tenant_id_key_splits_requests }, + {"tenant_id_key_partial_success", flb_test_tenant_id_key_partial_success }, + {"tenant_id_key_partial_error", flb_test_tenant_id_key_partial_error }, {"basic" , flb_test_basic }, {"labels" , flb_test_labels }, {"label_keys" , flb_test_label_keys },