diff --git a/include/fluent-bit/flb_opentelemetry.h b/include/fluent-bit/flb_opentelemetry.h index cdb529e0aef..e6cab38ee85 100644 --- a/include/fluent-bit/flb_opentelemetry.h +++ b/include/fluent-bit/flb_opentelemetry.h @@ -117,6 +117,8 @@ struct flb_opentelemetry_otlp_logs_options { const char **logs_body_keys; size_t logs_body_key_count; int logs_body_key_attributes; + int max_resources; /* 0 = unlimited */ + int max_scopes; /* 0 = unlimited, per resource */ }; /* Backward-compatible alias for older external callers. */ diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index 50bd7e05edc..17df2c94776 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -193,7 +194,8 @@ static int opentelemetry_check_grpc_status(struct opentelemetry_context *ctx, int opentelemetry_legacy_post(struct opentelemetry_context *ctx, const void *body, size_t body_len, const char *tag, int tag_len, - const char *uri) + const char *uri, + const char *content_type) { size_t final_body_len; void *final_body; @@ -284,8 +286,8 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx, flb_http_add_header(c, FLB_OPENTELEMETRY_CONTENT_TYPE_HEADER_NAME, sizeof(FLB_OPENTELEMETRY_CONTENT_TYPE_HEADER_NAME) - 1, - FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL, - sizeof(FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL) - 1); + content_type, + strlen(content_type)); /* Basic Auth headers */ if (ctx->http_user != NULL && @@ -419,7 +421,8 @@ int opentelemetry_post(struct opentelemetry_context *ctx, const void *body, size_t body_len, const char *tag, int tag_len, const char *http_uri, - const char *grpc_uri) + const char *grpc_uri, + const char *content_type) { flb_sds_t oauth2_token; const char *compression_algorithm; @@ -438,7 +441,8 @@ int opentelemetry_post(struct opentelemetry_context *ctx, return opentelemetry_legacy_post(ctx, body, body_len, tag, tag_len, - http_uri); + http_uri, + content_type); } compression_algorithm = NULL; @@ -542,8 +546,7 @@ int opentelemetry_post(struct opentelemetry_context *ctx, result = flb_http_request_set_parameters(request, FLB_HTTP_CLIENT_ARGUMENT_URI(http_uri), - FLB_HTTP_CLIENT_ARGUMENT_CONTENT_TYPE( - FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL), + FLB_HTTP_CLIENT_ARGUMENT_CONTENT_TYPE(content_type), FLB_HTTP_CLIENT_ARGUMENT_BODY(body, body_len, compression_algorithm)); @@ -730,6 +733,104 @@ static int opentelemetry_format_test(struct flb_config *config, return 0; } +static int process_metrics_json(struct flb_event_chunk *event_chunk, + struct opentelemetry_context *ctx) +{ + int ret; + int result; + int json_result; + int c = 0; + int first_entry; + int ok; + size_t off = 0; + size_t rendered_len; + size_t inner_len; + struct cmt *cmt; + flb_sds_t rendered; + flb_sds_t output; + + /* + * Length of the {"resourceMetrics":[ wrapper that + * flb_opentelemetry_metrics_to_otlp_json emits around the array content. + * We strip it from each per-context render and accumulate the inner + * elements into a single merged document, avoiding a second msgpack + * decode pass. + */ + static const size_t prefix_len = sizeof("{\"resourceMetrics\":[") - 1; + static const size_t suffix_len = sizeof("]}") - 1; + + ok = CMT_DECODE_MSGPACK_SUCCESS; + first_entry = FLB_TRUE; + + output = flb_sds_create("{\"resourceMetrics\":["); + if (!output) { + flb_plg_error(ctx->ins, "could not allocate metrics output buffer"); + return FLB_RETRY; + } + + while ((ret = cmt_decode_msgpack_create(&cmt, + (char *) event_chunk->data, + event_chunk->size, &off)) == ok) { + append_labels(ctx, cmt); + + rendered = flb_opentelemetry_metrics_to_otlp_json(cmt, &json_result); + cmt_destroy(cmt); + + if (rendered == NULL) { + flb_plg_error(ctx->ins, + "failed to encode metrics as OTLP JSON (result=%d)", + json_result); + flb_sds_destroy(output); + return FLB_ERROR; + } + + rendered_len = flb_sds_len(rendered); + if (rendered_len > prefix_len + suffix_len) { + inner_len = rendered_len - prefix_len - suffix_len; + + if (!first_entry && flb_sds_cat_safe(&output, ",", 1) != 0) { + flb_sds_destroy(rendered); + flb_sds_destroy(output); + return FLB_RETRY; + } + + if (flb_sds_cat_safe(&output, + rendered + prefix_len, + (int) inner_len) != 0) { + flb_sds_destroy(rendered); + flb_sds_destroy(output); + return FLB_RETRY; + } + + first_entry = FLB_FALSE; + } + + flb_sds_destroy(rendered); + c++; + } + + if (ret != CMT_DECODE_MSGPACK_INSUFFICIENT_DATA || c == 0) { + flb_plg_error(ctx->ins, "error decoding metrics msgpack encoded context"); + flb_sds_destroy(output); + return FLB_ERROR; + } + + if (flb_sds_cat_safe(&output, "]}", 2) != 0) { + flb_sds_destroy(output); + return FLB_RETRY; + } + + result = opentelemetry_post(ctx, + output, flb_sds_len(output), + event_chunk->tag, + flb_sds_len(event_chunk->tag), + ctx->metrics_uri_sanitized, + ctx->grpc_metrics_uri, + FLB_OPENTELEMETRY_MIME_JSON_LITERAL); + flb_sds_destroy(output); + return result; +} + static int process_metrics(struct flb_event_chunk *event_chunk, struct flb_output_flush *out1_flush, struct flb_input_instance *ins, void *out_context, @@ -748,6 +849,10 @@ static int process_metrics(struct flb_event_chunk *event_chunk, /* Initialize vars */ ctx = out_context; + + if (ctx->use_json_encoding) { + return process_metrics_json(event_chunk, ctx); + } ok = CMT_DECODE_MSGPACK_SUCCESS; result = FLB_OK; @@ -800,7 +905,8 @@ static int process_metrics(struct flb_event_chunk *event_chunk, event_chunk->tag, flb_sds_len(event_chunk->tag), ctx->metrics_uri_sanitized, - ctx->grpc_metrics_uri); + ctx->grpc_metrics_uri, + FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL); /* Debug http_post() result statuses */ if (result == FLB_OK) { @@ -830,6 +936,31 @@ static int process_metrics(struct flb_event_chunk *event_chunk, return result; } +static int process_traces_json(struct flb_event_chunk *event_chunk, + struct opentelemetry_context *ctx) +{ + int result; + flb_sds_t json_payload; + + json_payload = flb_opentelemetry_traces_msgpack_to_otlp_json(event_chunk->data, + event_chunk->size, + &result); + if (json_payload == NULL) { + flb_plg_error(ctx->ins, "failed to encode traces as OTLP JSON (result=%d)", result); + return FLB_ERROR; + } + + result = opentelemetry_post(ctx, + json_payload, flb_sds_len(json_payload), + event_chunk->tag, + flb_sds_len(event_chunk->tag), + ctx->traces_uri_sanitized, + ctx->grpc_traces_uri, + FLB_OPENTELEMETRY_MIME_JSON_LITERAL); + flb_sds_destroy(json_payload); + return result; +} + static int process_traces(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *ins, void *out_context, @@ -847,6 +978,10 @@ static int process_traces(struct flb_event_chunk *event_chunk, ctx = out_context; result = FLB_OK; + if (ctx->use_json_encoding) { + return process_traces_json(event_chunk, ctx); + } + buf = flb_sds_create_size(event_chunk->size); if (!buf) { flb_plg_error(ctx->ins, "could not allocate outgoing buffer"); @@ -891,7 +1026,8 @@ static int process_traces(struct flb_event_chunk *event_chunk, event_chunk->tag, flb_sds_len(event_chunk->tag), ctx->traces_uri_sanitized, - ctx->grpc_traces_uri); + ctx->grpc_traces_uri, + FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL); /* Debug http_post() result statuses */ if (result == FLB_OK) { @@ -973,7 +1109,8 @@ static int process_profiles(struct flb_event_chunk *event_chunk, event_chunk->tag, flb_sds_len(event_chunk->tag), ctx->profiles_uri_sanitized, - ctx->grpc_profiles_uri); + ctx->grpc_profiles_uri, + FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL); /* Debug http_post() result statuses */ if (result == FLB_OK) { @@ -1070,7 +1207,7 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_BOOL, "grpc", "off", 0, FLB_TRUE, offsetof(struct opentelemetry_context, enable_grpc_flag), - "Enable, disable or force gRPC usage. Accepted values : on, off, auto" + "Enable gRPC transport (OTLP/gRPC). When enabled, HTTP/2 is automatically activated. Accepted values: on, off" }, { FLB_CONFIG_MAP_STR, "proxy", NULL, @@ -1123,6 +1260,15 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct opentelemetry_context, batch_size), "Set the maximum number of log records to be flushed at a time" }, + { + FLB_CONFIG_MAP_STR, "encoding", "protobuf", + 0, FLB_FALSE, 0, + "Set the encoding for exports (logs, traces, metrics). Accepted values: 'protobuf' (default, " + "Content-Type: application/x-protobuf) or 'json' (Content-Type: application/json). " + "Note: 'json' only applies to OTLP/HTTP transport; OTLP/gRPC (grpc=on) always uses protobuf " + "per the OpenTelemetry specification and will reject this combination at startup. " + "Note: profiles always use protobuf (no JSON encoder exists)." + }, { FLB_CONFIG_MAP_STR, "compress", NULL, 0, FLB_FALSE, 0, @@ -1160,7 +1306,8 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "logs_body_key", NULL, FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct opentelemetry_context, log_body_key_list_str), - "Specify an optional HTTP URI for the target OTel endpoint." + "Record accessor pattern(s) used to populate the OTLP log body. The first matching key wins. " + "May be specified multiple times. Defaults to '$log' then '$message'." }, { diff --git a/plugins/out_opentelemetry/opentelemetry.h b/plugins/out_opentelemetry/opentelemetry.h index b45b97c92e6..1e8b052950c 100644 --- a/plugins/out_opentelemetry/opentelemetry.h +++ b/plugins/out_opentelemetry/opentelemetry.h @@ -34,6 +34,7 @@ #define FLB_OPENTELEMETRY_CONTENT_TYPE_HEADER_NAME "Content-Type" #define FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL "application/x-protobuf" +#define FLB_OPENTELEMETRY_MIME_JSON_LITERAL "application/json" /* * This lets you send log records in batches instead of a request per log record @@ -190,6 +191,9 @@ struct opentelemetry_context { /* compression: zstd */ int compress_zstd; + /* encoding: 0 = protobuf (default), 1 = JSON */ + int use_json_encoding; + /* FLB/OTLP Record accessor patterns */ struct flb_record_accessor *ra_meta_schema; struct flb_record_accessor *ra_meta_resource_id; @@ -217,5 +221,6 @@ int opentelemetry_post(struct opentelemetry_context *ctx, const void *body, size_t body_len, const char *tag, int tag_len, const char *http_uri, - const char *grpc_uri); + const char *grpc_uri, + const char *content_type); #endif diff --git a/plugins/out_opentelemetry/opentelemetry_conf.c b/plugins/out_opentelemetry/opentelemetry_conf.c index c09cbf9eb2a..cde9bbbb417 100644 --- a/plugins/out_opentelemetry/opentelemetry_conf.c +++ b/plugins/out_opentelemetry/opentelemetry_conf.c @@ -548,6 +548,44 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output } } + ctx->use_json_encoding = FLB_FALSE; + tmp = flb_output_get_property("encoding", ins); + if (tmp) { + if (strcasecmp(tmp, "json") == 0) { + ctx->use_json_encoding = FLB_TRUE; + } + else if (strcasecmp(tmp, "protobuf") != 0) { + flb_plg_error(ctx->ins, + "Unknown encoding value '%s'. " + "Accepted values: 'protobuf' (default) or 'json'", tmp); + flb_opentelemetry_context_destroy(ctx); + return NULL; + } + } + + /* + * Per the OpenTelemetry specification, OTLP/gRPC exclusively uses protobuf + * encoding on the wire. Reject the combination early so the user gets a + * clear error rather than silent data corruption. + * Reference: https://opentelemetry.io/docs/specs/otlp/#otlpgrpc + */ + if (ctx->use_json_encoding && ctx->enable_grpc_flag) { + flb_plg_error(ctx->ins, + "encoding=json is incompatible with grpc=on. " + "OTLP/gRPC only supports protobuf encoding per the " + "OpenTelemetry specification. " + "Use encoding=json with HTTP transport (grpc=off) instead."); + flb_opentelemetry_context_destroy(ctx); + return NULL; + } + + if (ctx->use_json_encoding) { + flb_plg_warn(ctx->ins, + "encoding=json does not apply to profiles: profiles " + "will always be exported as protobuf " + "(no JSON encoder exists for cprofiles)"); + } + ctx->ra_observed_timestamp_metadata = flb_ra_create((char*)ctx->logs_observed_timestamp_metadata_key, FLB_FALSE); if (ctx->ra_observed_timestamp_metadata == NULL) { diff --git a/plugins/out_opentelemetry/opentelemetry_logs.c b/plugins/out_opentelemetry/opentelemetry_logs.c index f8af8577af8..44373af7721 100644 --- a/plugins/out_opentelemetry/opentelemetry_logs.c +++ b/plugins/out_opentelemetry/opentelemetry_logs.c @@ -936,12 +936,12 @@ static int logs_flush_to_otel(struct opentelemetry_context *ctx, struct flb_even opentelemetry__proto__collector__logs__v1__export_logs_service_request__pack(export_logs, body); - /* send post request to opentelemetry with content type application/x-protobuf */ ret = opentelemetry_post(ctx, body, len, event_chunk->tag, flb_sds_len(event_chunk->tag), ctx->logs_uri_sanitized, - ctx->grpc_logs_uri); + ctx->grpc_logs_uri, + FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL); flb_free(body); return ret; @@ -1107,6 +1107,63 @@ static int set_scope_attributes(struct flb_record_accessor *ra, return 0; } +static int otel_process_logs_json(struct flb_event_chunk *event_chunk, + struct opentelemetry_context *ctx) +{ + int ret; + int json_result; + size_t key_count; + size_t key_idx; + flb_sds_t json_payload; + struct mk_list *head; + struct opentelemetry_body_key *bk; + struct flb_opentelemetry_otlp_logs_options opts = {0}; + const char **body_keys = NULL; + + key_count = mk_list_size(&ctx->log_body_key_list); + if (key_count > 0) { + body_keys = flb_calloc(key_count, sizeof(const char *)); + if (!body_keys) { + flb_errno(); + return FLB_ERROR; + } + + key_idx = 0; + mk_list_foreach(head, &ctx->log_body_key_list) { + bk = mk_list_entry(head, struct opentelemetry_body_key, _head); + body_keys[key_idx++] = bk->key; + } + + opts.logs_body_keys = body_keys; + opts.logs_body_key_count = key_count; + } + + opts.logs_body_key_attributes = ctx->logs_body_key_attributes; + opts.max_resources = ctx->max_resources; + opts.max_scopes = ctx->max_scopes; + + json_payload = flb_opentelemetry_logs_to_otlp_json(event_chunk->data, + event_chunk->size, + &opts, + &json_result); + flb_free(body_keys); + + if (json_payload == NULL) { + flb_plg_error(ctx->ins, "failed to encode logs as OTLP JSON (result=%d)", json_result); + return FLB_ERROR; + } + + ret = opentelemetry_post(ctx, + json_payload, flb_sds_len(json_payload), + event_chunk->tag, + flb_sds_len(event_chunk->tag), + ctx->logs_uri_sanitized, + ctx->grpc_logs_uri, + FLB_OPENTELEMETRY_MIME_JSON_LITERAL); + flb_sds_destroy(json_payload); + return ret; +} + int otel_process_logs(struct flb_event_chunk *event_chunk, struct flb_output_flush *out_flush, struct flb_input_instance *ins, void *out_context, @@ -1153,6 +1210,10 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, ctx = (struct opentelemetry_context *) out_context; + if (ctx->use_json_encoding) { + return otel_process_logs_json(event_chunk, ctx); + } + decoder = flb_log_event_decoder_create((char *) event_chunk->data, event_chunk->size); if (decoder == NULL) { flb_plg_error(ctx->ins, "could not initialize record decoder"); diff --git a/src/opentelemetry/flb_opentelemetry_otlp_json.c b/src/opentelemetry/flb_opentelemetry_otlp_json.c index 65ce09b484b..a9d2d4db358 100644 --- a/src/opentelemetry/flb_opentelemetry_otlp_json.c +++ b/src/opentelemetry/flb_opentelemetry_otlp_json.c @@ -73,65 +73,114 @@ struct otlp_metrics_scope_state { static msgpack_object *msgpack_map_get_object(msgpack_object_map *map, const char *key); -static uint64_t msgpack_object_hash(msgpack_object *object) +/* + * stream_hash_msgpack_object - feed a deterministic byte sequence for 'obj' + * into a running XXH3 state without allocating a serialisation buffer. + * + * Each value is prefixed with a type discriminant byte so that objects of + * different types with coincidentally equal content produce different hashes + * (e.g. the integer 1 and the boolean true are distinguishable). + * Container sizes are fed as native-endian uint32_t; this is consistent + * within a single process run, which is all these hashes are used for. + */ +static void stream_hash_msgpack_object(cfl_hash_state_t *state, + msgpack_object *obj) { - uint64_t hash; - msgpack_sbuffer buffer; - msgpack_packer packer; + uint32_t sz; + uint8_t tag; + size_t i; - if (object == NULL) { - return cfl_hash_64bits("null", 4); + tag = (obj == NULL) ? (uint8_t) MSGPACK_OBJECT_NIL : (uint8_t) obj->type; + cfl_hash_64bits_update(state, &tag, 1); + + if (obj == NULL) { + return; } - msgpack_sbuffer_init(&buffer); - msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write); + switch (obj->type) { + case MSGPACK_OBJECT_NIL: + break; - if (msgpack_pack_object(&packer, *object) != 0) { - msgpack_sbuffer_destroy(&buffer); - return 0; - } + case MSGPACK_OBJECT_BOOLEAN: + tag = obj->via.boolean ? 1 : 0; + cfl_hash_64bits_update(state, &tag, 1); + break; - hash = cfl_hash_64bits(buffer.data, buffer.size); - msgpack_sbuffer_destroy(&buffer); + case MSGPACK_OBJECT_POSITIVE_INTEGER: + cfl_hash_64bits_update(state, &obj->via.u64, sizeof(obj->via.u64)); + break; - return hash; -} + case MSGPACK_OBJECT_NEGATIVE_INTEGER: + cfl_hash_64bits_update(state, &obj->via.i64, sizeof(obj->via.i64)); + break; -static uint64_t msgpack_object_pair_hash(msgpack_object *left, - msgpack_object *right) -{ - uint64_t hash; - msgpack_sbuffer buffer; - msgpack_packer packer; + case MSGPACK_OBJECT_FLOAT32: + case MSGPACK_OBJECT_FLOAT64: + cfl_hash_64bits_update(state, &obj->via.u64, sizeof(obj->via.u64)); + break; - msgpack_sbuffer_init(&buffer); - msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write); + case MSGPACK_OBJECT_STR: + sz = (uint32_t) obj->via.str.size; + cfl_hash_64bits_update(state, &sz, sizeof(sz)); + cfl_hash_64bits_update(state, obj->via.str.ptr, obj->via.str.size); + break; - if (msgpack_pack_array(&packer, 2) != 0) { - msgpack_sbuffer_destroy(&buffer); - return 0; - } + case MSGPACK_OBJECT_BIN: + sz = (uint32_t) obj->via.bin.size; + cfl_hash_64bits_update(state, &sz, sizeof(sz)); + cfl_hash_64bits_update(state, obj->via.bin.ptr, obj->via.bin.size); + break; - if (left == NULL) { - msgpack_pack_nil(&packer); - } - else if (msgpack_pack_object(&packer, *left) != 0) { - msgpack_sbuffer_destroy(&buffer); - return 0; - } + case MSGPACK_OBJECT_ARRAY: + sz = obj->via.array.size; + cfl_hash_64bits_update(state, &sz, sizeof(sz)); + for (i = 0; i < obj->via.array.size; i++) { + stream_hash_msgpack_object(state, &obj->via.array.ptr[i]); + } + break; - if (right == NULL) { - msgpack_pack_nil(&packer); - } - else if (msgpack_pack_object(&packer, *right) != 0) { - msgpack_sbuffer_destroy(&buffer); - return 0; + case MSGPACK_OBJECT_MAP: + sz = obj->via.map.size; + cfl_hash_64bits_update(state, &sz, sizeof(sz)); + for (i = 0; i < obj->via.map.size; i++) { + stream_hash_msgpack_object(state, &obj->via.map.ptr[i].key); + stream_hash_msgpack_object(state, &obj->via.map.ptr[i].val); + } + break; + + case MSGPACK_OBJECT_EXT: + tag = (uint8_t) obj->via.ext.type; + cfl_hash_64bits_update(state, &tag, 1); + sz = (uint32_t) obj->via.ext.size; + cfl_hash_64bits_update(state, &sz, sizeof(sz)); + cfl_hash_64bits_update(state, obj->via.ext.ptr, obj->via.ext.size); + break; + + default: + break; } +} - hash = cfl_hash_64bits(buffer.data, buffer.size); - msgpack_sbuffer_destroy(&buffer); +static uint64_t msgpack_object_hash(msgpack_object *object) +{ + cfl_hash_state_t state; - return hash; + cfl_hash_64bits_reset(&state); + stream_hash_msgpack_object(&state, object); + return cfl_hash_64bits_digest(&state); +} + +static uint64_t msgpack_object_pair_hash(msgpack_object *left, + msgpack_object *right) +{ + cfl_hash_state_t state; + uint8_t marker = 2; /* 2-element tuple discriminant */ + + cfl_hash_64bits_reset(&state); + cfl_hash_64bits_update(&state, &marker, 1); + stream_hash_msgpack_object(&state, left); + stream_hash_msgpack_object(&state, right); + return cfl_hash_64bits_digest(&state); } static msgpack_object *resource_schema_url_object(msgpack_object *resource_object, @@ -1258,13 +1307,25 @@ static struct otlp_logs_scope_state *append_logs_scope_state( return state; } +/* + * ensure_default_logs_scope_state — find or create the synthetic default + * resource+scope used for ungrouped log records (require_otel_metadata=false). + * + * Return values: + * 0 success + * 1 record silently dropped because max_resources or max_scopes would be + * exceeded (caller should continue to the next event) + * -1 allocation failure + */ static int ensure_default_logs_scope_state( struct flb_json_mut_doc *doc, struct flb_json_mut_val *resource_logs, struct otlp_logs_resource_state **resource_states, size_t *resource_state_count, struct otlp_logs_resource_state **current_resource, - struct otlp_logs_scope_state **current_scope) + struct otlp_logs_scope_state **current_scope, + int max_resources, + int max_scopes) { uint64_t resource_hash; uint64_t scope_hash; @@ -1277,6 +1338,10 @@ static int ensure_default_logs_scope_state( 0, resource_hash); if (*current_resource == NULL) { + if (max_resources > 0 && + *resource_state_count >= (size_t) max_resources) { + return 1; + } *current_resource = append_logs_resource_state(doc, resource_logs, resource_states, @@ -1292,6 +1357,10 @@ static int ensure_default_logs_scope_state( *current_scope = find_logs_scope_state(*current_resource, 0, scope_hash); if (*current_scope == NULL) { + if (max_scopes > 0 && + (*current_resource)->scope_count >= (size_t) max_scopes) { + return 1; + } *current_scope = append_logs_scope_state(doc, *current_resource, 0, @@ -1323,17 +1392,24 @@ static msgpack_object *find_log_body_candidate(msgpack_object *body, } for (index = 0; index < logs_body_key_count; index++) { - if (logs_body_keys[index] == NULL) { + const char *lookup = logs_body_keys[index]; + + if (lookup == NULL) { continue; } - candidate = msgpack_map_get_object(&body->via.map, logs_body_keys[index]); + /* Strip the leading '$' from record-accessor patterns (e.g. "$log" -> "log"). */ + if (lookup[0] == '$' && lookup[1] != '\0') { + lookup = lookup + 1; + } + + candidate = msgpack_map_get_object(&body->via.map, lookup); if (candidate != NULL) { if (matched_key != NULL) { - *matched_key = logs_body_keys[index]; + *matched_key = lookup; } if (matched_key_length != NULL) { - *matched_key_length = strlen(logs_body_keys[index]); + *matched_key_length = strlen(lookup); } return candidate; } @@ -1580,6 +1656,9 @@ static flb_sds_t flb_opentelemetry_logs_to_otlp_json_render( int *result) { int ret; + int max_resources; + int max_scopes; + int skip_current_group; int32_t record_type; int logs_body_key_attributes; int require_otel_metadata; @@ -1617,10 +1696,14 @@ static flb_sds_t flb_opentelemetry_logs_to_otlp_json_render( logs_body_keys = default_logs_body_keys; logs_body_key_count = 2; logs_body_key_attributes = FLB_FALSE; + max_resources = 0; + max_scopes = 0; if (options != NULL) { require_otel_metadata = options->logs_require_otel_metadata; logs_body_key_attributes = options->logs_body_key_attributes; + max_resources = options->max_resources; + max_scopes = options->max_scopes; if (options->logs_body_keys != NULL && options->logs_body_key_count > 0) { logs_body_keys = options->logs_body_keys; logs_body_key_count = options->logs_body_key_count; @@ -1656,6 +1739,7 @@ static flb_sds_t flb_opentelemetry_logs_to_otlp_json_render( resource_state_count = 0; current_resource = NULL; current_scope = NULL; + skip_current_group = FLB_FALSE; json = NULL; if (doc == NULL || root == NULL || resource_logs == NULL || @@ -1683,6 +1767,7 @@ static flb_sds_t flb_opentelemetry_logs_to_otlp_json_render( } if (record_type == FLB_LOG_EVENT_GROUP_START) { + skip_current_group = FLB_FALSE; group_metadata = event.group_metadata != NULL ? event.group_metadata : event.metadata; group_body = event.body; @@ -1701,6 +1786,7 @@ static flb_sds_t flb_opentelemetry_logs_to_otlp_json_render( return NULL; } + skip_current_group = FLB_TRUE; current_resource = NULL; current_scope = NULL; continue; @@ -1722,6 +1808,15 @@ static flb_sds_t flb_opentelemetry_logs_to_otlp_json_render( resource_id, resource_hash); if (current_resource == NULL) { + if (max_resources > 0 && + resource_state_count >= (size_t) max_resources) { + /* resource limit reached: skip all records in this group */ + skip_current_group = FLB_TRUE; + current_resource = NULL; + current_scope = NULL; + continue; + } + current_resource = append_logs_resource_state(doc, resource_logs, &resource_states, @@ -1743,6 +1838,14 @@ static flb_sds_t flb_opentelemetry_logs_to_otlp_json_render( scope_id, scope_hash); if (current_scope == NULL) { + if (max_scopes > 0 && + current_resource->scope_count >= (size_t) max_scopes) { + /* scope limit reached: skip all records in this group */ + skip_current_group = FLB_TRUE; + current_scope = NULL; + continue; + } + current_scope = append_logs_scope_state(doc, current_resource, scope_id, @@ -1760,11 +1863,16 @@ static flb_sds_t flb_opentelemetry_logs_to_otlp_json_render( continue; } else if (record_type == FLB_LOG_EVENT_GROUP_END) { + skip_current_group = FLB_FALSE; current_resource = NULL; current_scope = NULL; continue; } + if (skip_current_group == FLB_TRUE) { + continue; + } + if (current_scope == NULL) { if (require_otel_metadata == FLB_TRUE) { flb_log_event_decoder_destroy(&decoder); @@ -1774,17 +1882,22 @@ static flb_sds_t flb_opentelemetry_logs_to_otlp_json_render( return NULL; } - if (ensure_default_logs_scope_state(doc, - resource_logs, - &resource_states, - &resource_state_count, - ¤t_resource, - ¤t_scope) != 0) { - flb_log_event_decoder_destroy(&decoder); - destroy_logs_resource_states(resource_states, resource_state_count); - flb_json_mut_doc_destroy(doc); - set_error(result, FLB_OPENTELEMETRY_OTLP_JSON_NOT_SUPPORTED, ENOMEM); - return NULL; + { + int cap_ret = ensure_default_logs_scope_state( + doc, resource_logs, + &resource_states, &resource_state_count, + ¤t_resource, ¤t_scope, + max_resources, max_scopes); + if (cap_ret == 1) { + continue; /* record dropped: cap reached */ + } + if (cap_ret != 0) { + flb_log_event_decoder_destroy(&decoder); + destroy_logs_resource_states(resource_states, resource_state_count); + flb_json_mut_doc_destroy(doc); + set_error(result, FLB_OPENTELEMETRY_OTLP_JSON_NOT_SUPPORTED, ENOMEM); + return NULL; + } } } @@ -1826,16 +1939,868 @@ static flb_sds_t flb_opentelemetry_logs_to_otlp_json_render( return json; } +/* ========================================================================= + * Streaming OTLP/JSON log encoder + * ========================================================================= + * Writes OTLP JSON directly into a growing flb_sds_t output buffer in a + * single pass over the msgpack event chunk. No intermediate JSON AST is + * built, so the allocation cost per flush drops from ~80×N malloc()/free() + * calls (one per AST node, N records) to ~4 geometric SDS reallocs total. + * + * The pretty-print variant still uses the AST path; only the hot production + * path is replaced here. + * ========================================================================= */ + +/* Internal: append s[0..len-1] to *b; evaluates to 0 on success, -1 on OOM */ +#define _SJ(b, s, l) flb_sds_cat_safe((b), (s), (int)(l)) +#define _SJL(b, s) _SJ((b), (s), (int)(sizeof(s) - 1)) + +/* For int-returning helpers: bail out with -1 on OOM */ +#define SJCAT(b, s, l) do { if (_SJ((b),(s),(l)) != 0) return -1; } while(0) +#define SJCATL(b, s) do { if (_SJL((b),(s)) != 0) return -1; } while(0) + +/* For the flb_sds_t-returning main function: jump to cleanup label */ +#define GJCATL(b, s) do { if (_SJL((b),(s)) != 0) goto oom; } while(0) +#define GCALL(f) do { if ((f) != 0) goto oom; } while(0) + +/* Forward declaration — stream_otlp_any_value and stream_otlp_kv are mutually + * recursive (a kvlist value contains key-value pairs whose values are + * themselves AnyValues). */ +static int stream_otlp_any_value(flb_sds_t *buf, msgpack_object *obj); + +/* + * Append a quoted, JSON-escaped UTF-8 string to *buf. + * Batches runs of plain characters into a single cat call so that the common + * case (no escaping needed) is a single memcpy via flb_sds_cat_safe. + */ +static int stream_json_str(flb_sds_t *buf, const char *str, size_t len) +{ + static const char hex[] = "0123456789abcdef"; + char esc[6]; + size_t i; + size_t run_start; + unsigned char c; + + SJCATL(buf, "\""); + + for (i = 0, run_start = 0; i < len; i++) { + c = (unsigned char) str[i]; + if (c != '"' && c != '\\' && c >= 0x20) { + continue; /* plain character — extend the run */ + } + if (i > run_start) { + SJCAT(buf, str + run_start, i - run_start); + } + switch (c) { + case '"': SJCATL(buf, "\\\""); break; + case '\\': SJCATL(buf, "\\\\"); break; + case '\b': SJCATL(buf, "\\b"); break; + case '\f': SJCATL(buf, "\\f"); break; + case '\n': SJCATL(buf, "\\n"); break; + case '\r': SJCATL(buf, "\\r"); break; + case '\t': SJCATL(buf, "\\t"); break; + default: + esc[0] = '\\'; esc[1] = 'u'; esc[2] = '0'; esc[3] = '0'; + esc[4] = hex[(c >> 4) & 0xf]; + esc[5] = hex[c & 0xf]; + SJCAT(buf, esc, 6); + break; + } + run_start = i + 1; + } + + if (i > run_start) { + SJCAT(buf, str + run_start, i - run_start); + } + + return _SJL(buf, "\""); +} + +/* Append base64-encoded binary to *buf (reuses the existing helper). */ +static int stream_base64_append(flb_sds_t *buf, const char *data, size_t len) +{ + flb_sds_t encoded; + int ret; + + if (binary_to_base64_sds(data, len, &encoded) != 0) { + return -1; + } + ret = _SJ(buf, encoded, (int) flb_sds_len(encoded)); + flb_sds_destroy(encoded); + return ret; +} + +/* + * Append a trace-id or span-id field value. + * Uses hex encoding when the binary length matches the expected OTLP size + * (16 bytes for trace, 8 for span), falling back to base64 otherwise — + * matching the behaviour of add_binary_id_field() in the AST path. + */ +static int stream_binary_id(flb_sds_t *buf, + msgpack_object *obj, + size_t expected_len) +{ + char hex_buf[33]; /* 16 bytes → 32 hex chars + NUL */ + + if (obj == NULL || obj->type != MSGPACK_OBJECT_BIN) { + return 0; /* field absent — omit silently */ + } + + if (obj->via.bin.size == expected_len && + binary_to_hex(hex_buf, sizeof(hex_buf), + obj->via.bin.ptr, obj->via.bin.size) == 0) { + return _SJ(buf, hex_buf, (int)(expected_len * 2)); + } + + return stream_base64_append(buf, obj->via.bin.ptr, obj->via.bin.size); +} + +/* + * Append one OTLP KeyValue JSON object: + * {"key":"","value":} + */ +static int stream_otlp_kv(flb_sds_t *buf, + msgpack_object *key, + msgpack_object *val) +{ + if (key->type != MSGPACK_OBJECT_STR) { + return -1; + } + SJCATL(buf, "{\"key\":"); + if (stream_json_str(buf, key->via.str.ptr, key->via.str.size) != 0) return -1; + SJCATL(buf, ",\"value\":"); + if (stream_otlp_any_value(buf, val) != 0) return -1; + return _SJL(buf, "}"); +} + +/* + * Append an OTLP AnyValue for a msgpack object. + * Produces the correct OTLP JSON type wrapper for every msgpack type. + */ +static int stream_otlp_any_value(flb_sds_t *buf, msgpack_object *obj) +{ + char tmp[64]; + int n; + size_t i; + + if (obj == NULL || obj->type == MSGPACK_OBJECT_NIL) { + return _SJL(buf, "{}"); + } + + switch (obj->type) { + case MSGPACK_OBJECT_STR: + SJCATL(buf, "{\"stringValue\":"); + if (stream_json_str(buf, obj->via.str.ptr, obj->via.str.size) != 0) return -1; + return _SJL(buf, "}"); + + case MSGPACK_OBJECT_BOOLEAN: + return obj->via.boolean + ? _SJL(buf, "{\"boolValue\":true}") + : _SJL(buf, "{\"boolValue\":false}"); + + case MSGPACK_OBJECT_POSITIVE_INTEGER: + /* OTLP JSON spec: int64 values MUST be decimal strings */ + n = snprintf(tmp, sizeof(tmp), + "{\"intValue\":\"%" PRIu64 "\"}", obj->via.u64); + return _SJ(buf, tmp, n); + + case MSGPACK_OBJECT_NEGATIVE_INTEGER: + n = snprintf(tmp, sizeof(tmp), + "{\"intValue\":\"%" PRId64 "\"}", obj->via.i64); + return _SJ(buf, tmp, n); + + case MSGPACK_OBJECT_FLOAT32: + case MSGPACK_OBJECT_FLOAT64: + n = snprintf(tmp, sizeof(tmp), "{\"doubleValue\":%.17g}", obj->via.f64); + return _SJ(buf, tmp, n); + + case MSGPACK_OBJECT_BIN: + SJCATL(buf, "{\"bytesValue\":\""); + if (stream_base64_append(buf, obj->via.bin.ptr, obj->via.bin.size) != 0) return -1; + return _SJL(buf, "\"}"); + + case MSGPACK_OBJECT_ARRAY: + SJCATL(buf, "{\"arrayValue\":{\"values\":["); + for (i = 0; i < obj->via.array.size; i++) { + if (i > 0) { SJCATL(buf, ","); } + if (stream_otlp_any_value(buf, &obj->via.array.ptr[i]) != 0) return -1; + } + return _SJL(buf, "]}}"); + + case MSGPACK_OBJECT_MAP: + SJCATL(buf, "{\"kvlistValue\":{\"values\":["); + for (i = 0; i < obj->via.map.size; i++) { + if (i > 0) { SJCATL(buf, ","); } + if (stream_otlp_kv(buf, + &obj->via.map.ptr[i].key, + &obj->via.map.ptr[i].val) != 0) return -1; + } + return _SJL(buf, "]}}"); + + default: + return _SJL(buf, "{}"); + } +} + +/* Append all key-value pairs from a msgpack map as OTLP KV objects. */ +static int stream_otlp_kv_from_map(flb_sds_t *buf, msgpack_object *map) +{ + size_t i; + + if (map == NULL || map->type != MSGPACK_OBJECT_MAP) { + return 0; + } + for (i = 0; i < map->via.map.size; i++) { + if (i > 0) { SJCATL(buf, ","); } + if (stream_otlp_kv(buf, + &map->via.map.ptr[i].key, + &map->via.map.ptr[i].val) != 0) return -1; + } + return 0; +} + +/* + * Same as stream_otlp_kv_from_map but skip one key by exact name match. + * Used when emitting body-key attributes: the matched body key is excluded + * from the attribute list. + */ +static int stream_otlp_kv_from_map_filtered(flb_sds_t *buf, + msgpack_object *map, + const char *skip_key, + size_t skip_len) +{ + size_t i; + int first = 1; + msgpack_object *key; + + if (map == NULL || map->type != MSGPACK_OBJECT_MAP) { + return 0; + } + for (i = 0; i < map->via.map.size; i++) { + key = &map->via.map.ptr[i].key; + if (key->type != MSGPACK_OBJECT_STR) { + return -1; + } + if (skip_key != NULL && + key->via.str.size == skip_len && + memcmp(key->via.str.ptr, skip_key, skip_len) == 0) { + continue; + } + if (!first) { SJCATL(buf, ","); } + if (stream_otlp_kv(buf, key, &map->via.map.ptr[i].val) != 0) return -1; + first = 0; + } + return 0; +} + +/* + * Append an OTLP Resource JSON object: + * {"attributes":[...],"droppedAttributesCount":N} + * Mirrors logs_group_resource_to_json() from the AST path. + */ +/* + * Append an OTLP Resource JSON object. + * Mirrors logs_group_resource_to_json(): only writes "attributes" when the + * resource map contains that key, so a NULL/empty resource produces "{}" + * rather than "{\"attributes\":[]}". + */ +static int stream_otlp_resource(flb_sds_t *buf, msgpack_object *res) +{ + msgpack_object *attrs; + msgpack_object *dropped; + int first = 1; + char tmp[48]; + int n; + + SJCATL(buf, "{"); + + if (res != NULL && res->type == MSGPACK_OBJECT_MAP) { + attrs = msgpack_map_get_object(&res->via.map, "attributes"); + if (attrs != NULL) { + SJCATL(buf, "\"attributes\":["); + if (attrs->type == MSGPACK_OBJECT_MAP) { + if (stream_otlp_kv_from_map(buf, attrs) != 0) return -1; + } + SJCATL(buf, "]"); + first = 0; + } + + dropped = msgpack_map_get_object(&res->via.map, "dropped_attributes_count"); + if (dropped != NULL && + dropped->type == MSGPACK_OBJECT_POSITIVE_INTEGER && + dropped->via.u64 > 0) { + if (!first) { SJCATL(buf, ","); } + n = snprintf(tmp, sizeof(tmp), + "\"droppedAttributesCount\":%" PRIu64, dropped->via.u64); + SJCAT(buf, tmp, n); + first = 0; + } + } + + (void) first; + return _SJL(buf, "}"); +} + +/* + * Append an OTLP InstrumentationScope JSON object: + * {"name":"...","version":"...","attributes":[...]} + * Mirrors logs_group_scope_to_json() from the AST path. + */ +static int stream_otlp_scope(flb_sds_t *buf, msgpack_object *scope) +{ + msgpack_object *field; + int first = 1; + char tmp[48]; + int n; + + SJCATL(buf, "{"); + + if (scope == NULL || scope->type != MSGPACK_OBJECT_MAP) { + return _SJL(buf, "}"); + } + + field = msgpack_map_get_object(&scope->via.map, "name"); + if (field != NULL && field->type == MSGPACK_OBJECT_STR) { + SJCATL(buf, "\"name\":"); + if (stream_json_str(buf, field->via.str.ptr, field->via.str.size) != 0) return -1; + first = 0; + } + + field = msgpack_map_get_object(&scope->via.map, "version"); + if (field != NULL && field->type == MSGPACK_OBJECT_STR) { + if (!first) { SJCATL(buf, ","); } + SJCATL(buf, "\"version\":"); + if (stream_json_str(buf, field->via.str.ptr, field->via.str.size) != 0) return -1; + first = 0; + } + + field = msgpack_map_get_object(&scope->via.map, "attributes"); + if (field != NULL && field->type == MSGPACK_OBJECT_MAP && field->via.map.size > 0) { + if (!first) { SJCATL(buf, ","); } + SJCATL(buf, "\"attributes\":["); + if (stream_otlp_kv_from_map(buf, field) != 0) return -1; + SJCATL(buf, "]"); + first = 0; + } + + field = msgpack_map_get_object(&scope->via.map, "dropped_attributes_count"); + if (field != NULL && + field->type == MSGPACK_OBJECT_POSITIVE_INTEGER && + field->via.u64 > 0) { + if (!first) { SJCATL(buf, ","); } + n = snprintf(tmp, sizeof(tmp), + "\"droppedAttributesCount\":%" PRIu64, field->via.u64); + SJCAT(buf, tmp, n); + first = 0; + } + + (void) first; + return _SJL(buf, "}"); +} + +/* + * Append a complete OTLP LogRecord JSON object, replicating the field set + * and precedence rules of log_record_to_json() from the AST path. + */ +static int stream_otlp_log_record(flb_sds_t *buf, + struct flb_log_event *event, + const char **body_keys, + size_t body_key_count, + int body_key_attrs) +{ + msgpack_object *metadata = event->metadata; + msgpack_object *otlp_meta = NULL; + msgpack_object *field; + msgpack_object *body_val; + const char *matched_key = NULL; + size_t matched_key_len = 0; + uint64_t ts; + int first = 1; + int attrs_written = FLB_FALSE; + char tmp[64]; + int n; + + SJCATL(buf, "{"); + + if (metadata != NULL && metadata->type == MSGPACK_OBJECT_MAP) { + otlp_meta = msgpack_map_get_object(&metadata->via.map, + FLB_OTEL_LOGS_METADATA_KEY); + } + + /* timeUnixNano — prefer the OTLP-preserved timestamp, fall back to the + * normalised event timestamp. */ + if (otlp_meta != NULL && otlp_meta->type == MSGPACK_OBJECT_MAP && + otlp_uint64_field_value(&otlp_meta->via.map, "timestamp", &ts) == 0) { + /* OTLP timestamp already extracted */ + } + else if (event->raw_timestamp != NULL) { + if (event->raw_timestamp->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + ts = event->raw_timestamp->via.u64; + } + else if (event->raw_timestamp->type == MSGPACK_OBJECT_NEGATIVE_INTEGER && + event->raw_timestamp->via.i64 >= 0) { + ts = (uint64_t) event->raw_timestamp->via.i64; + } + else { + ts = flb_time_to_nanosec(&event->timestamp); + } + } + else { + ts = flb_time_to_nanosec(&event->timestamp); + } + + if (ts > 0) { + n = snprintf(tmp, sizeof(tmp), "\"timeUnixNano\":\"%" PRIu64 "\"", ts); + SJCAT(buf, tmp, n); + first = 0; + } + + if (otlp_meta != NULL && otlp_meta->type == MSGPACK_OBJECT_MAP) { + + /* observedTimeUnixNano */ + field = msgpack_map_get_object(&otlp_meta->via.map, "observed_timestamp"); + if (field != NULL) { + uint64_t obs = 0; + if (field->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + obs = field->via.u64; + } + else if (field->type == MSGPACK_OBJECT_NEGATIVE_INTEGER && + field->via.i64 >= 0) { + obs = (uint64_t) field->via.i64; + } + if (obs > 0) { + if (!first) { SJCATL(buf, ","); } + n = snprintf(tmp, sizeof(tmp), + "\"observedTimeUnixNano\":\"%" PRIu64 "\"", obs); + SJCAT(buf, tmp, n); + first = 0; + } + } + + /* severityNumber */ + field = msgpack_map_get_object(&otlp_meta->via.map, "severity_number"); + if (field != NULL) { + uint64_t sev = 0; + if (field->type == MSGPACK_OBJECT_POSITIVE_INTEGER) { + sev = field->via.u64; + } + else if (field->type == MSGPACK_OBJECT_NEGATIVE_INTEGER && + field->via.i64 >= 0) { + sev = (uint64_t) field->via.i64; + } + if (sev > 0) { + if (!first) { SJCATL(buf, ","); } + n = snprintf(tmp, sizeof(tmp), "\"severityNumber\":%" PRIu64, sev); + SJCAT(buf, tmp, n); + first = 0; + } + } + + /* severityText */ + field = msgpack_map_get_object(&otlp_meta->via.map, "severity_text"); + if (field != NULL && field->type == MSGPACK_OBJECT_STR) { + if (!first) { SJCATL(buf, ","); } + SJCATL(buf, "\"severityText\":"); + if (stream_json_str(buf, field->via.str.ptr, field->via.str.size) != 0) return -1; + first = 0; + } + + /* attributes (from OTLP metadata) */ + field = msgpack_map_get_object(&otlp_meta->via.map, "attributes"); + if (field != NULL && field->type == MSGPACK_OBJECT_MAP && + field->via.map.size > 0) { + if (!first) { SJCATL(buf, ","); } + SJCATL(buf, "\"attributes\":["); + if (stream_otlp_kv_from_map(buf, field) != 0) return -1; + SJCATL(buf, "]"); + attrs_written = FLB_TRUE; + first = 0; + } + + /* traceId */ + field = msgpack_map_get_object(&otlp_meta->via.map, "trace_id"); + if (field != NULL && field->type == MSGPACK_OBJECT_BIN) { + if (!first) { SJCATL(buf, ","); } + SJCATL(buf, "\"traceId\":\""); + if (stream_binary_id(buf, field, 16) != 0) return -1; + SJCATL(buf, "\""); + first = 0; + } + + /* spanId */ + field = msgpack_map_get_object(&otlp_meta->via.map, "span_id"); + if (field != NULL && field->type == MSGPACK_OBJECT_BIN) { + if (!first) { SJCATL(buf, ","); } + SJCATL(buf, "\"spanId\":\""); + if (stream_binary_id(buf, field, 8) != 0) return -1; + SJCATL(buf, "\""); + first = 0; + } + } + + /* body — use find_log_body_candidate to honour body_keys config */ + body_val = find_log_body_candidate(event->body, + body_keys, body_key_count, + &matched_key, &matched_key_len); + + /* attributes from body map (body_key_attributes=true) */ + if (body_key_attrs == FLB_TRUE && + matched_key != NULL && + event->body != NULL && + event->body->type == MSGPACK_OBJECT_MAP && + attrs_written == FLB_FALSE) { + size_t i; + int any = 0; + for (i = 0; i < event->body->via.map.size; i++) { + msgpack_object *k = &event->body->via.map.ptr[i].key; + if (k->type == MSGPACK_OBJECT_STR && + !(k->via.str.size == matched_key_len && + memcmp(k->via.str.ptr, matched_key, matched_key_len) == 0)) { + any = 1; + break; + } + } + if (any) { + if (!first) { SJCATL(buf, ","); } + SJCATL(buf, "\"attributes\":["); + if (stream_otlp_kv_from_map_filtered(buf, event->body, + matched_key, + matched_key_len) != 0) return -1; + SJCATL(buf, "]"); + first = 0; + } + } + + if (body_val != NULL) { + if (!first) { SJCATL(buf, ","); } + SJCATL(buf, "\"body\":"); + if (stream_otlp_any_value(buf, body_val) != 0) return -1; + first = 0; + } + + (void) first; + return _SJL(buf, "}"); +} + +/* + * Main streaming render function. + * + * State machine mirrors flb_opentelemetry_logs_to_otlp_json_render() but + * writes JSON directly to a single flb_sds_t buffer rather than building an + * AST first. Groups are NOT deduplicated across GROUP_START events; each + * GROUP_START opens a new resourceLogs entry. For all real-world OTLP + * producers this is equivalent, since a given resource+scope pair appears as + * a single contiguous group within one chunk. + */ +/* + * Per-resource scope counter used to enforce max_scopes across GROUP_START + * events that share the same resource identity hash. Stack-allocated; 128 + * distinct resources per chunk is more than enough for any real workload. + */ +#define STREAM_MAX_RES_TRACK 128 +typedef struct { uint64_t hash; int scopes; } stream_res_scope_t; + +static flb_sds_t otlp_logs_stream_render( + const void *data, + size_t size, + const struct flb_opentelemetry_otlp_logs_options *opts, + int *result) +{ + /* ── option resolution (mirrors flb_opentelemetry_logs_to_otlp_json_render) */ + static const char *default_body_keys[] = {"log", "message"}; + const char **body_keys = default_body_keys; + size_t body_key_count = 2; + const char *body_key_ptr = NULL; /* wrapper for singular key */ + int body_key_attrs = FLB_FALSE; + int require_meta = FLB_TRUE; /* same default as AST path */ + int max_resources = 0; + int max_scopes = 0; + + if (opts != NULL) { + require_meta = opts->logs_require_otel_metadata; + body_key_attrs = opts->logs_body_key_attributes; + max_resources = opts->max_resources; + max_scopes = opts->max_scopes; + + if (opts->logs_body_keys != NULL && opts->logs_body_key_count > 0) { + body_keys = opts->logs_body_keys; + body_key_count = opts->logs_body_key_count; + } + else if (opts->logs_body_key != NULL) { + body_key_ptr = opts->logs_body_key; + body_keys = &body_key_ptr; + body_key_count = 1; + } + } + + /* ── per-resource scope counter (for max_scopes enforcement) */ + stream_res_scope_t res_scope[STREAM_MAX_RES_TRACK]; + int res_scope_count = 0; + + /* ── state */ + int ret; + int record_type; + int in_group = FLB_FALSE; + int skip_group = FLB_FALSE; + int first_group = FLB_TRUE; + int first_record = FLB_TRUE; + int resource_count = 0; + int64_t resource_id; + int64_t scope_id; + /* Resource-level schemaUrl is emitted AFTER scopeLogs (matching the AST + * path in append_logs_resource_state). We store a pointer into the + * immutable input buffer so we can emit it at group-close time. */ + const char *res_schema_url_ptr = NULL; + size_t res_schema_url_len = 0; + flb_sds_t buf; + msgpack_object *resource_obj; + msgpack_object *scope_obj; + msgpack_object *group_metadata; + msgpack_object *group_body; + msgpack_object *schema_url; + struct flb_log_event event; + struct flb_log_event_decoder decoder; + + /* ── OTLP pre-check (same as AST path when require_meta is set) */ + if (require_meta == FLB_TRUE && + flb_opentelemetry_logs_chunk_is_otlp(data, size) != FLB_TRUE) { + set_error(result, FLB_OPENTELEMETRY_OTLP_JSON_INVALID_LOG_EVENT, EINVAL); + return NULL; + } + + /* Pre-size at 4× the raw msgpack: OTLP JSON is typically 3–5× larger + * for K8s log records. SDS grows geometrically so underestimates are + * cheap (2–3 extra reallocs at most). */ + buf = flb_sds_create_size(size > 0 ? size * 4 : 256); + if (buf == NULL) { + set_error(result, FLB_OPENTELEMETRY_OTLP_JSON_NOT_SUPPORTED, ENOMEM); + return NULL; + } + + GJCATL(&buf, "{\"resourceLogs\":["); + + ret = flb_log_event_decoder_init(&decoder, (char *) data, size); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_sds_destroy(buf); + set_error(result, FLB_OPENTELEMETRY_OTLP_JSON_INVALID_LOG_EVENT, EINVAL); + return NULL; + } + flb_log_event_decoder_read_groups(&decoder, FLB_TRUE); + + while ((ret = flb_log_event_decoder_next(&decoder, &event)) == + FLB_EVENT_DECODER_SUCCESS) { + + ret = flb_log_event_decoder_get_record_type(&event, &record_type); + if (ret != 0) { + flb_log_event_decoder_destroy(&decoder); + flb_sds_destroy(buf); + set_error(result, FLB_OPENTELEMETRY_OTLP_JSON_INVALID_LOG_EVENT, EINVAL); + return NULL; + } + + /* ── GROUP_START ─────────────────────────────────────────────── */ + if (record_type == FLB_LOG_EVENT_GROUP_START) { + + if (in_group) { + /* close logRecords ] scope_entry } scopeLogs ] */ + GJCATL(&buf, "]}]"); + /* emit resource-level schemaUrl before closing the entry */ + if (res_schema_url_ptr != NULL) { + GJCATL(&buf, ",\"schemaUrl\":"); + GCALL(stream_json_str(&buf, res_schema_url_ptr, res_schema_url_len)); + res_schema_url_ptr = NULL; + } + /* close resource_entry } */ + GJCATL(&buf, "}"); + in_group = FLB_FALSE; + } + + skip_group = FLB_FALSE; + group_metadata = (event.group_metadata != NULL) + ? event.group_metadata : event.metadata; + group_body = event.body; + + if (group_metadata == NULL || + group_metadata->type != MSGPACK_OBJECT_MAP || + msgpack_map_entry_is_string(&group_metadata->via.map, + FLB_OTEL_LOGS_SCHEMA_KEY, + FLB_OTEL_LOGS_SCHEMA_OTLP) != FLB_TRUE || + msgpack_map_get_int64(&group_metadata->via.map, + "resource_id", &resource_id) != 0 || + msgpack_map_get_int64(&group_metadata->via.map, + "scope_id", &scope_id) != 0) { + if (require_meta == FLB_TRUE) { + flb_log_event_decoder_destroy(&decoder); + flb_sds_destroy(buf); + set_error(result, + FLB_OPENTELEMETRY_OTLP_JSON_INVALID_LOG_EVENT, EINVAL); + return NULL; + } + skip_group = FLB_TRUE; + continue; + } + + if (max_resources > 0 && resource_count >= max_resources) { + skip_group = FLB_TRUE; + continue; + } + + resource_obj = NULL; + scope_obj = NULL; + if (group_body != NULL && group_body->type == MSGPACK_OBJECT_MAP) { + resource_obj = msgpack_map_get_object(&group_body->via.map, "resource"); + scope_obj = msgpack_map_get_object(&group_body->via.map, "scope"); + } + + /* + * max_scopes enforcement: track how many scopes each resource + * has accumulated. We only hash when max_scopes is active to + * avoid the serialize-to-hash cost in the common (unlimited) case. + */ + if (max_scopes > 0) { + uint64_t res_hash = resource_identity_hash(resource_obj, group_body); + int rs_idx; + for (rs_idx = 0; rs_idx < res_scope_count; rs_idx++) { + if (res_scope[rs_idx].hash == res_hash) { + break; + } + } + if (rs_idx < res_scope_count) { + if (res_scope[rs_idx].scopes >= max_scopes) { + skip_group = FLB_TRUE; + continue; + } + res_scope[rs_idx].scopes++; + } + else if (res_scope_count < STREAM_MAX_RES_TRACK) { + res_scope[res_scope_count].hash = res_hash; + res_scope[res_scope_count].scopes = 1; + res_scope_count++; + } + } + + if (!first_group) { GJCATL(&buf, ","); } + GJCATL(&buf, "{\"resource\":"); + GCALL(stream_otlp_resource(&buf, resource_obj)); + + /* Store resource-level schemaUrl; it is written AFTER scopeLogs + * (mirroring append_logs_resource_state in the AST path). */ + schema_url = resource_schema_url_object(resource_obj, group_body); + if (schema_url != NULL && schema_url->type == MSGPACK_OBJECT_STR) { + res_schema_url_ptr = schema_url->via.str.ptr; + res_schema_url_len = schema_url->via.str.size; + } + else { + res_schema_url_ptr = NULL; + } + + GJCATL(&buf, ",\"scopeLogs\":[{\"scope\":"); + GCALL(stream_otlp_scope(&buf, scope_obj)); + + if (scope_obj != NULL && scope_obj->type == MSGPACK_OBJECT_MAP) { + schema_url = msgpack_map_get_object(&scope_obj->via.map, "schema_url"); + if (schema_url != NULL && schema_url->type == MSGPACK_OBJECT_STR) { + GJCATL(&buf, ",\"schemaUrl\":"); + GCALL(stream_json_str(&buf, + schema_url->via.str.ptr, + schema_url->via.str.size)); + } + } + + GJCATL(&buf, ",\"logRecords\":["); + + in_group = FLB_TRUE; + first_group = FLB_FALSE; + first_record = FLB_TRUE; + resource_count++; + continue; + } + + /* ── GROUP_END ───────────────────────────────────────────────── */ + if (record_type == FLB_LOG_EVENT_GROUP_END) { + if (in_group) { + GJCATL(&buf, "]}]"); + if (res_schema_url_ptr != NULL) { + GJCATL(&buf, ",\"schemaUrl\":"); + GCALL(stream_json_str(&buf, res_schema_url_ptr, res_schema_url_len)); + res_schema_url_ptr = NULL; + } + GJCATL(&buf, "}"); + in_group = FLB_FALSE; + } + skip_group = FLB_FALSE; + continue; + } + + /* ── NORMAL RECORD ───────────────────────────────────────────── */ + if (skip_group) { + continue; + } + + /* Open a default empty resource+scope for ungrouped records. */ + if (!in_group) { + if (max_resources > 0 && resource_count >= max_resources) { + continue; + } + if (!first_group) { GJCATL(&buf, ","); } + /* Empty resource/scope mirrors what logs_group_resource_to_json(NULL) + * and logs_group_scope_to_json(NULL) produce: bare "{}". */ + GJCATL(&buf, + "{\"resource\":{}," + "\"scopeLogs\":[{\"scope\":{}," + "\"logRecords\":["); + in_group = FLB_TRUE; + first_group = FLB_FALSE; + first_record = FLB_TRUE; + resource_count++; + } + + if (!first_record) { GJCATL(&buf, ","); } + + GCALL(stream_otlp_log_record(&buf, &event, + body_keys, body_key_count, body_key_attrs)); + + first_record = FLB_FALSE; + } + + if (in_group) { + GJCATL(&buf, "]}]"); + if (res_schema_url_ptr != NULL) { + GJCATL(&buf, ",\"schemaUrl\":"); + GCALL(stream_json_str(&buf, res_schema_url_ptr, res_schema_url_len)); + } + GJCATL(&buf, "}"); + } + + GJCATL(&buf, "]}"); + + flb_log_event_decoder_destroy(&decoder); + set_result(result, FLB_OPENTELEMETRY_OTLP_JSON_SUCCESS); + return buf; + +oom: + flb_log_event_decoder_destroy(&decoder); + flb_sds_destroy(buf); + set_error(result, FLB_OPENTELEMETRY_OTLP_JSON_NOT_SUPPORTED, ENOMEM); + return NULL; +} +#undef STREAM_MAX_RES_TRACK + +/* Clean up the streaming-encoder-local macros. */ +#undef _SJ +#undef _SJL +#undef SJCAT +#undef SJCATL +#undef GJCATL +#undef GCALL + flb_sds_t flb_opentelemetry_logs_to_otlp_json(const void *event_chunk_data, size_t event_chunk_size, struct flb_opentelemetry_otlp_logs_options *options, int *result) { - return flb_opentelemetry_logs_to_otlp_json_render(event_chunk_data, - event_chunk_size, - options, - FLB_FALSE, - result); + return otlp_logs_stream_render(event_chunk_data, event_chunk_size, + options, result); } flb_sds_t flb_opentelemetry_logs_to_otlp_json_pretty(const void *event_chunk_data, diff --git a/src/opentelemetry/flb_opentelemetry_otlp_proto.c b/src/opentelemetry/flb_opentelemetry_otlp_proto.c index 93cdbd16960..8fbc593c68e 100644 --- a/src/opentelemetry/flb_opentelemetry_otlp_proto.c +++ b/src/opentelemetry/flb_opentelemetry_otlp_proto.c @@ -722,6 +722,15 @@ static void destroy_logs_resource_states( flb_free(states); } +static const char *body_key_lookup_name(const char *pattern) +{ + if (pattern != NULL && pattern[0] == '$' && pattern[1] != '\0') { + return pattern + 1; + } + + return pattern; +} + static msgpack_object *find_log_body_candidate(msgpack_object *body, const char **logs_body_keys, size_t logs_body_key_count, @@ -729,6 +738,7 @@ static msgpack_object *find_log_body_candidate(msgpack_object *body, size_t *matched_key_length) { size_t index; + const char *lookup; msgpack_object *candidate; if (body == NULL || body->type == MSGPACK_OBJECT_NIL) { @@ -744,13 +754,14 @@ static msgpack_object *find_log_body_candidate(msgpack_object *body, continue; } - candidate = msgpack_map_get_object(&body->via.map, logs_body_keys[index]); + lookup = body_key_lookup_name(logs_body_keys[index]); + candidate = msgpack_map_get_object(&body->via.map, lookup); if (candidate != NULL) { if (matched_key != NULL) { - *matched_key = logs_body_keys[index]; + *matched_key = lookup; } if (matched_key_length != NULL) { - *matched_key_length = strlen(logs_body_keys[index]); + *matched_key_length = strlen(lookup); } return candidate; } diff --git a/tests/internal/opentelemetry.c b/tests/internal/opentelemetry.c index 404cca8e1b9..93b1575949b 100644 --- a/tests/internal/opentelemetry.c +++ b/tests/internal/opentelemetry.c @@ -2735,6 +2735,591 @@ void test_opentelemetry_traces_otlp_proto_roundtrip() ctr_destroy(trace_context); } +/* + * Test that the multi-key body_keys array path (used by otel_process_logs_json) + * extracts the log body from a matching key and ignores unmatched keys. + */ +/* + * Encode a single plain-log record via the body_keys array path and verify the + * OTLP JSON output matches exactly what the default $message key would produce. + */ +void test_opentelemetry_logs_otlp_json_body_keys_array() +{ + int ret; + int result; + flb_sds_t actual; + flb_sds_t normalized_expected; + struct flb_time timestamp; + struct flb_log_event_encoder encoder; + struct flb_opentelemetry_otlp_logs_options options; + const char *body_keys[] = { "$log", "$message" }; + char *expected = + "{\"resourceLogs\":[{\"resource\":{},\"scopeLogs\":[{\"scope\":{}," + "\"logRecords\":[{\"timeUnixNano\":\"1640995200000000000\"," + "\"body\":{\"stringValue\":\"hello json encoding\"}}]}]}]}"; + + timestamp.tm.tv_sec = 1640995200; + timestamp.tm.tv_nsec = 0; + + ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return; + } + + ret = flb_log_event_encoder_begin_record(&encoder); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + ret = flb_log_event_encoder_set_timestamp(&encoder, ×tamp); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + ret = flb_log_event_encoder_append_body_values( + &encoder, + FLB_LOG_EVENT_CSTRING_VALUE("message"), + FLB_LOG_EVENT_CSTRING_VALUE("hello json encoding")); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + ret = flb_log_event_encoder_commit_record(&encoder); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + memset(&options, 0, sizeof(options)); + options.logs_require_otel_metadata = FLB_FALSE; + options.logs_body_keys = body_keys; + options.logs_body_key_count = 2; + options.logs_body_key_attributes = FLB_FALSE; + + actual = flb_opentelemetry_logs_to_otlp_json(encoder.output_buffer, + encoder.output_length, + &options, + &result); + TEST_CHECK(actual != NULL); + TEST_CHECK(result == FLB_OPENTELEMETRY_OTLP_JSON_SUCCESS); + if (actual == NULL) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + normalized_expected = test_normalize_json(expected); + TEST_CHECK(normalized_expected != NULL); + if (normalized_expected == NULL) { + flb_sds_destroy(actual); + flb_log_event_encoder_destroy(&encoder); + return; + } + + TEST_CHECK(strcmp(normalized_expected, actual) == 0); + + flb_sds_destroy(normalized_expected); + flb_sds_destroy(actual); + flb_log_event_encoder_destroy(&encoder); +} + +/* + * When logs_body_key_attributes=true, keys that are NOT the designated body key + * should appear in the logRecord's attributes instead of being dropped. + */ +void test_opentelemetry_logs_otlp_json_body_key_attributes() +{ + int ret; + int result; + flb_sds_t actual; + struct flb_time timestamp; + struct flb_log_event_encoder encoder; + struct flb_opentelemetry_otlp_logs_options options; + const char *body_keys[] = { "$message" }; + + timestamp.tm.tv_sec = 1640995200; + timestamp.tm.tv_nsec = 0; + + ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return; + } + + ret = flb_log_event_encoder_begin_record(&encoder); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + ret = flb_log_event_encoder_set_timestamp(&encoder, ×tamp); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + /* "message" is the designated body key; "level" is a remaining key → attribute */ + ret = flb_log_event_encoder_append_body_values( + &encoder, + FLB_LOG_EVENT_CSTRING_VALUE("message"), + FLB_LOG_EVENT_CSTRING_VALUE("body text")); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + ret = flb_log_event_encoder_append_body_values( + &encoder, + FLB_LOG_EVENT_CSTRING_VALUE("level"), + FLB_LOG_EVENT_CSTRING_VALUE("info")); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + ret = flb_log_event_encoder_commit_record(&encoder); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + memset(&options, 0, sizeof(options)); + options.logs_require_otel_metadata = FLB_FALSE; + options.logs_body_keys = body_keys; + options.logs_body_key_count = 1; + options.logs_body_key_attributes = FLB_TRUE; + + actual = flb_opentelemetry_logs_to_otlp_json(encoder.output_buffer, + encoder.output_length, + &options, + &result); + TEST_CHECK(actual != NULL); + TEST_CHECK(result == FLB_OPENTELEMETRY_OTLP_JSON_SUCCESS); + if (actual == NULL) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + /* "message" must be the body; "level" must appear as an attribute */ + TEST_CHECK(strstr(actual, "\"stringValue\":\"body text\"") != NULL); + TEST_CHECK(strstr(actual, "\"level\"") != NULL); + TEST_CHECK(strstr(actual, "\"info\"") != NULL); + /* the body key itself must NOT bleed into attributes */ + TEST_CHECK(strstr(actual, "\"message\"") == NULL); + + flb_sds_destroy(actual); + flb_log_event_encoder_destroy(&encoder); +} + +void test_opentelemetry_metrics_otlp_json_add_label() +{ + int ret; + int result; + char *msgpack_buffer; + size_t msgpack_size; + flb_sds_t actual; + struct cfl_list contexts; + struct cmt *context; + char *input = + "{\"resourceMetrics\":[{\"resource\":{\"attributes\":[{\"key\":\"service.name\"," + "\"value\":{\"stringValue\":\"svc\"}}]},\"scopeMetrics\":[{\"scope\":{\"name\":\"scope-a\"," + "\"version\":\"1.0.0\"},\"metrics\":[{\"name\":\"requests_total\",\"description\":\"count\"," + "\"unit\":\"1\",\"sum\":{\"dataPoints\":[{\"attributes\":[{\"key\":\"method\"," + "\"value\":{\"stringValue\":\"GET\"}}],\"timeUnixNano\":\"1704067201000000000\"," + "\"startTimeUnixNano\":\"1704067200000000000\",\"asInt\":\"42\"}]," + "\"aggregationTemporality\":2,\"isMonotonic\":true}}]}]}]}"; + + cfl_list_init(&contexts); + + ret = flb_opentelemetry_metrics_json_to_cmt(&contexts, input, strlen(input)); + TEST_CHECK(ret == 0); + if (ret != 0 || cfl_list_is_empty(&contexts)) { + destroy_metrics_context_list(&contexts); + return; + } + + context = cfl_list_entry_first(&contexts, struct cmt, _head); + + ret = cmt_label_add(context, "env", "prod"); + TEST_CHECK(ret == 0); + if (ret != 0) { + destroy_metrics_context_list(&contexts); + return; + } + + ret = cmt_encode_msgpack_create(context, &msgpack_buffer, &msgpack_size); + TEST_CHECK(ret == 0); + if (ret != 0) { + destroy_metrics_context_list(&contexts); + return; + } + + actual = flb_opentelemetry_metrics_msgpack_to_otlp_json(msgpack_buffer, + msgpack_size, + &result); + TEST_CHECK(actual != NULL); + TEST_CHECK(result == FLB_OPENTELEMETRY_OTLP_JSON_SUCCESS); + if (actual == NULL) { + flb_free(msgpack_buffer); + destroy_metrics_context_list(&contexts); + return; + } + + TEST_CHECK(strstr(actual, "\"env\"") != NULL); + TEST_CHECK(strstr(actual, "\"prod\"") != NULL); + + flb_free(msgpack_buffer); + flb_sds_destroy(actual); + destroy_metrics_context_list(&contexts); +} + +void test_opentelemetry_logs_otlp_json_max_resources_cap() +{ + int ret; + int result; + flb_sds_t actual; + struct flb_log_event_encoder encoder; + struct flb_opentelemetry_otlp_logs_options options; + char *input_a = + "{\"resourceLogs\":[{\"resource\":{\"attributes\":[{\"key\":\"user.id\"," + "\"value\":{\"stringValue\":\"user-a\"}}]},\"scopeLogs\":[{\"scope\":{}," + "\"logRecords\":[{\"timeUnixNano\":\"1640995200000000000\"," + "\"body\":{\"stringValue\":\"event-a\"}}]}]}]}"; + char *input_b = + "{\"resourceLogs\":[{\"resource\":{\"attributes\":[{\"key\":\"user.id\"," + "\"value\":{\"stringValue\":\"user-b\"}}]},\"scopeLogs\":[{\"scope\":{}," + "\"logRecords\":[{\"timeUnixNano\":\"1640995201000000000\"," + "\"body\":{\"stringValue\":\"event-b\"}}]}]}]}"; + + ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return; + } + + ret = flb_opentelemetry_logs_json_to_msgpack(&encoder, + input_a, strlen(input_a), + "log", &result); + TEST_CHECK(ret == 0); + TEST_CHECK(result == 0); + + ret = flb_opentelemetry_logs_json_to_msgpack(&encoder, + input_b, strlen(input_b), + "log", &result); + TEST_CHECK(ret == 0); + TEST_CHECK(result == 0); + if (ret != 0 || result != 0) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + memset(&options, 0, sizeof(options)); + options.logs_require_otel_metadata = FLB_TRUE; + options.max_resources = 1; + + actual = flb_opentelemetry_logs_to_otlp_json(encoder.output_buffer, + encoder.output_length, + &options, + &result); + TEST_CHECK(actual != NULL); + TEST_CHECK(result == FLB_OPENTELEMETRY_OTLP_JSON_SUCCESS); + if (actual == NULL) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + /* first resource and its record must be present */ + TEST_CHECK(strstr(actual, "user-a") != NULL); + TEST_CHECK(strstr(actual, "event-a") != NULL); + /* second resource must be silently dropped */ + TEST_CHECK(strstr(actual, "user-b") == NULL); + TEST_CHECK(strstr(actual, "event-b") == NULL); + + flb_sds_destroy(actual); + flb_log_event_encoder_destroy(&encoder); +} + +/* + * Verify that max_scopes=1 causes the second scopeLogs group to be silently + * dropped while the first scope's log record is preserved. + * + * Setup: one resource with two distinct scopes (scope-a and scope-b), each + * carrying one log record. With max_scopes=1 only scope-a/event-a should + * appear in the output JSON. + */ +void test_opentelemetry_logs_otlp_json_max_scopes_cap() +{ + int ret; + int result; + flb_sds_t actual; + struct flb_log_event_encoder encoder; + struct flb_opentelemetry_otlp_logs_options options; + /* Two scope logs inside a single resource */ + char *input_two_scopes = + "{\"resourceLogs\":[{\"resource\":{\"attributes\":[{\"key\":\"svc\"," + "\"value\":{\"stringValue\":\"test\"}}]}," + "\"scopeLogs\":[" + "{\"scope\":{\"name\":\"scope-a\"},\"logRecords\":[{\"timeUnixNano\":" + "\"1640995200000000000\",\"body\":{\"stringValue\":\"event-a\"}}]}," + "{\"scope\":{\"name\":\"scope-b\"},\"logRecords\":[{\"timeUnixNano\":" + "\"1640995201000000000\",\"body\":{\"stringValue\":\"event-b\"}}]}" + "]}]}"; + + ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_DEFAULT); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return; + } + + ret = flb_opentelemetry_logs_json_to_msgpack(&encoder, + input_two_scopes, + strlen(input_two_scopes), + "log", + &result); + TEST_CHECK(ret == 0); + TEST_CHECK(result == 0); + if (ret != 0 || result != 0) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + memset(&options, 0, sizeof(options)); + options.logs_require_otel_metadata = FLB_TRUE; + options.max_scopes = 1; + + actual = flb_opentelemetry_logs_to_otlp_json(encoder.output_buffer, + encoder.output_length, + &options, + &result); + TEST_CHECK(actual != NULL); + TEST_MSG("flb_opentelemetry_logs_to_otlp_json returned NULL"); + TEST_CHECK(result == FLB_OPENTELEMETRY_OTLP_JSON_SUCCESS); + TEST_MSG("result code: %d", result); + if (actual == NULL) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + /* first scope must be present */ + TEST_CHECK(strstr(actual, "scope-a") != NULL); + TEST_MSG("scope-a not found in output: %s", actual); + TEST_CHECK(strstr(actual, "event-a") != NULL); + TEST_MSG("event-a not found in output: %s", actual); + /* second scope must be silently dropped */ + TEST_CHECK(strstr(actual, "scope-b") == NULL); + TEST_MSG("scope-b should have been dropped but found in output: %s", actual); + TEST_CHECK(strstr(actual, "event-b") == NULL); + TEST_MSG("event-b should have been dropped but found in output: %s", actual); + + flb_sds_destroy(actual); + flb_log_event_encoder_destroy(&encoder); +} + +/* + * Verify that a logs_body_key with a leading '$' prefix (record-accessor style, + * e.g. "$log") is correctly stripped and used as a plain map key during body + * extraction. The resulting body must contain the field value, not the full + * record map. + */ +void test_opentelemetry_logs_otlp_json_body_key_dollar_prefix() +{ + int ret; + int result; + flb_sds_t actual; + struct flb_time timestamp; + struct flb_log_event_encoder encoder; + struct flb_opentelemetry_otlp_logs_options options; + const char *body_keys[] = { "$log" }; + + timestamp.tm.tv_sec = 1640995200; + timestamp.tm.tv_nsec = 0; + + ret = flb_log_event_encoder_init(&encoder, FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + return; + } + + ret = flb_log_event_encoder_begin_record(&encoder); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + ret = flb_log_event_encoder_set_timestamp(&encoder, ×tamp); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + /* record body: {"log": "extracted body text", "other": "ignored"} */ + ret = flb_log_event_encoder_append_body_values( + &encoder, + FLB_LOG_EVENT_CSTRING_VALUE("log"), + FLB_LOG_EVENT_CSTRING_VALUE("extracted body text")); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + ret = flb_log_event_encoder_append_body_values( + &encoder, + FLB_LOG_EVENT_CSTRING_VALUE("other"), + FLB_LOG_EVENT_CSTRING_VALUE("ignored")); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + ret = flb_log_event_encoder_commit_record(&encoder); + TEST_CHECK(ret == FLB_EVENT_ENCODER_SUCCESS); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + memset(&options, 0, sizeof(options)); + options.logs_require_otel_metadata = FLB_FALSE; + options.logs_body_keys = body_keys; + options.logs_body_key_count = 1; + options.logs_body_key_attributes = FLB_FALSE; + + actual = flb_opentelemetry_logs_to_otlp_json(encoder.output_buffer, + encoder.output_length, + &options, + &result); + TEST_CHECK(actual != NULL); + TEST_MSG("flb_opentelemetry_logs_to_otlp_json returned NULL"); + TEST_CHECK(result == FLB_OPENTELEMETRY_OTLP_JSON_SUCCESS); + TEST_MSG("result code: %d", result); + if (actual == NULL) { + flb_log_event_encoder_destroy(&encoder); + return; + } + + /* + * The body must be the string value of the "log" key, not the full record + * map serialised as a kvlistValue. + */ + TEST_CHECK(strstr(actual, "\"stringValue\":\"extracted body text\"") != NULL); + TEST_MSG("expected body stringValue not found in output: %s", actual); + + /* The output must NOT contain a kvlistValue wrapping the whole record */ + TEST_CHECK(strstr(actual, "\"kvlistValue\"") == NULL); + TEST_MSG("body was wrapped as kvlistValue (full map) instead of a string: %s", actual); + + flb_sds_destroy(actual); + flb_log_event_encoder_destroy(&encoder); +} + +/* + * Regression guard: verify that the protobuf encoding path + * (flb_opentelemetry_metrics_msgpack_to_otlp_proto) still works correctly + * after the JSON encoding feature was introduced. Uses the same input as + * test_opentelemetry_metrics_otlp_json_roundtrip. + */ +void test_opentelemetry_metrics_otlp_json_protobuf_default() +{ + int ret; + int result; + char *msgpack_buffer; + char *input; + size_t msgpack_size; + flb_sds_t actual; + struct cfl_list contexts; + struct cmt *context; + Opentelemetry__Proto__Collector__Metrics__V1__ExportMetricsServiceRequest *decoded; + + input = + "{\"resourceMetrics\":[{\"resource\":{\"attributes\":[{\"key\":\"service.name\"," + "\"value\":{\"stringValue\":\"svc\"}}]},\"scopeMetrics\":[{\"scope\":{\"name\":\"scope-a\"," + "\"version\":\"1.0.0\"},\"metrics\":[{\"name\":\"requests_total\",\"description\":\"count\"," + "\"unit\":\"1\",\"sum\":{\"dataPoints\":[{\"attributes\":[{\"key\":\"method\"," + "\"value\":{\"stringValue\":\"GET\"}}],\"timeUnixNano\":\"1704067201000000000\"," + "\"startTimeUnixNano\":\"1704067200000000000\",\"asInt\":\"42\"}]," + "\"aggregationTemporality\":2,\"isMonotonic\":true}}]}]}]}"; + + cfl_list_init(&contexts); + + ret = flb_opentelemetry_metrics_json_to_cmt(&contexts, + input, + strlen(input)); + TEST_CHECK(ret == 0); + if (ret != 0 || cfl_list_is_empty(&contexts)) { + destroy_metrics_context_list(&contexts); + return; + } + + context = cfl_list_entry_first(&contexts, struct cmt, _head); + ret = cmt_encode_msgpack_create(context, &msgpack_buffer, &msgpack_size); + TEST_CHECK(ret == 0); + if (ret != 0) { + destroy_metrics_context_list(&contexts); + return; + } + + actual = flb_opentelemetry_metrics_msgpack_to_otlp_proto(msgpack_buffer, + msgpack_size, + &result); + TEST_CHECK(actual != NULL); + TEST_MSG("flb_opentelemetry_metrics_msgpack_to_otlp_proto returned NULL"); + TEST_CHECK(result == FLB_OPENTELEMETRY_OTLP_PROTO_SUCCESS); + TEST_MSG("result code: %d (expected FLB_OPENTELEMETRY_OTLP_PROTO_SUCCESS)", result); + if (actual == NULL) { + flb_free(msgpack_buffer); + destroy_metrics_context_list(&contexts); + return; + } + + /* Decode and verify basic structural correctness */ + decoded = + opentelemetry__proto__collector__metrics__v1__export_metrics_service_request__unpack( + NULL, flb_sds_len(actual), (uint8_t *) actual); + TEST_CHECK(decoded != NULL); + TEST_MSG("protobuf unpack returned NULL"); + if (decoded != NULL) { + TEST_CHECK(decoded->n_resource_metrics > 0); + TEST_MSG("expected at least one resourceMetric, got 0"); + opentelemetry__proto__collector__metrics__v1__export_metrics_service_request__free_unpacked( + decoded, NULL); + } + + cmt_encode_opentelemetry_destroy((cfl_sds_t) actual); + flb_free(msgpack_buffer); + destroy_metrics_context_list(&contexts); +} + +/* + * Tests 4 and 5 (test_opentelemetry_logs_otlp_json_grpc_encoding_incompatibility + * and test_opentelemetry_logs_otlp_json_invalid_encoding) are NOT implemented + * as runnable tests. + * + * Reason: the validation logic for encoding=json vs grpc=on lives inside + * flb_opentelemetry_context_create() in opentelemetry_conf.c. That function + * requires a fully-initialized struct flb_output_instance (with config-map + * support, upstream creation, OAuth2, TLS, etc.) and a struct flb_config. + * Constructing those objects without spinning up the full Fluent Bit daemon is + * not feasible from an isolated unit test. The behaviour is covered at the + * integration level where a real plugin instance is started with conflicting + * configuration. + */ + /* Test list */ TEST_LIST = { { "hex_to_id", test_hex_to_id }, @@ -2766,5 +3351,19 @@ TEST_LIST = { test_opentelemetry_metrics_otlp_proto_roundtrip }, { "opentelemetry_metrics_msgpack_otlp_proto_merges_contexts", test_opentelemetry_metrics_msgpack_otlp_proto_merges_contexts }, + { "opentelemetry_logs_otlp_json_body_keys_array", + test_opentelemetry_logs_otlp_json_body_keys_array }, + { "opentelemetry_logs_otlp_json_body_key_attributes", + test_opentelemetry_logs_otlp_json_body_key_attributes }, + { "opentelemetry_metrics_otlp_json_add_label", + test_opentelemetry_metrics_otlp_json_add_label }, + { "opentelemetry_logs_otlp_json_max_resources_cap", + test_opentelemetry_logs_otlp_json_max_resources_cap }, + { "opentelemetry_logs_otlp_json_max_scopes_cap", + test_opentelemetry_logs_otlp_json_max_scopes_cap }, + { "opentelemetry_logs_otlp_json_body_key_dollar_prefix", + test_opentelemetry_logs_otlp_json_body_key_dollar_prefix }, + { "opentelemetry_metrics_otlp_json_protobuf_default", + test_opentelemetry_metrics_otlp_json_protobuf_default }, { 0 } };