Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ struct flb_opentelemetry_otlp_logs_options {
const char **logs_body_keys;
size_t logs_body_key_count;
int logs_body_key_attributes;
int max_resources; /* 0 = unlimited */
int max_scopes; /* 0 = unlimited, per resource */
};

/* Backward-compatible alias for older external callers. */
Expand Down
171 changes: 159 additions & 12 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_ra_key.h>
#include <fluent-bit/flb_opentelemetry.h>

#include <cfl/cfl.h>
#include <fluent-otel-proto/fluent-otel.h>
Expand Down Expand Up @@ -193,7 +194,8 @@ static int opentelemetry_check_grpc_status(struct opentelemetry_context *ctx,
int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
const void *body, size_t body_len,
const char *tag, int tag_len,
const char *uri)
const char *uri,
const char *content_type)
{
size_t final_body_len;
void *final_body;
Expand Down Expand Up @@ -284,8 +286,8 @@ int opentelemetry_legacy_post(struct opentelemetry_context *ctx,
flb_http_add_header(c,
FLB_OPENTELEMETRY_CONTENT_TYPE_HEADER_NAME,
sizeof(FLB_OPENTELEMETRY_CONTENT_TYPE_HEADER_NAME) - 1,
FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL,
sizeof(FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL) - 1);
content_type,
strlen(content_type));

/* Basic Auth headers */
if (ctx->http_user != NULL &&
Expand Down Expand Up @@ -419,7 +421,8 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
const void *body, size_t body_len,
const char *tag, int tag_len,
const char *http_uri,
const char *grpc_uri)
const char *grpc_uri,
const char *content_type)
{
flb_sds_t oauth2_token;
const char *compression_algorithm;
Expand All @@ -438,7 +441,8 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
return opentelemetry_legacy_post(ctx,
body, body_len,
tag, tag_len,
http_uri);
http_uri,
content_type);
}

compression_algorithm = NULL;
Expand Down Expand Up @@ -542,8 +546,7 @@ int opentelemetry_post(struct opentelemetry_context *ctx,

result = flb_http_request_set_parameters(request,
FLB_HTTP_CLIENT_ARGUMENT_URI(http_uri),
FLB_HTTP_CLIENT_ARGUMENT_CONTENT_TYPE(
FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL),
FLB_HTTP_CLIENT_ARGUMENT_CONTENT_TYPE(content_type),
FLB_HTTP_CLIENT_ARGUMENT_BODY(body,
body_len,
compression_algorithm));
Expand Down Expand Up @@ -730,6 +733,104 @@ static int opentelemetry_format_test(struct flb_config *config,
return 0;
}

static int process_metrics_json(struct flb_event_chunk *event_chunk,
struct opentelemetry_context *ctx)
{
int ret;
int result;
int json_result;
int c = 0;
int first_entry;
int ok;
size_t off = 0;
size_t rendered_len;
size_t inner_len;
struct cmt *cmt;
flb_sds_t rendered;
flb_sds_t output;

/*
* Length of the {"resourceMetrics":[ wrapper that
* flb_opentelemetry_metrics_to_otlp_json emits around the array content.
* We strip it from each per-context render and accumulate the inner
* elements into a single merged document, avoiding a second msgpack
* decode pass.
*/
static const size_t prefix_len = sizeof("{\"resourceMetrics\":[") - 1;
static const size_t suffix_len = sizeof("]}") - 1;

ok = CMT_DECODE_MSGPACK_SUCCESS;
first_entry = FLB_TRUE;

output = flb_sds_create("{\"resourceMetrics\":[");
if (!output) {
flb_plg_error(ctx->ins, "could not allocate metrics output buffer");
return FLB_RETRY;
}

while ((ret = cmt_decode_msgpack_create(&cmt,
(char *) event_chunk->data,
event_chunk->size, &off)) == ok) {
append_labels(ctx, cmt);

rendered = flb_opentelemetry_metrics_to_otlp_json(cmt, &json_result);
cmt_destroy(cmt);

if (rendered == NULL) {
flb_plg_error(ctx->ins,
"failed to encode metrics as OTLP JSON (result=%d)",
json_result);
flb_sds_destroy(output);
return FLB_ERROR;
}

rendered_len = flb_sds_len(rendered);
if (rendered_len > prefix_len + suffix_len) {
inner_len = rendered_len - prefix_len - suffix_len;

if (!first_entry && flb_sds_cat_safe(&output, ",", 1) != 0) {
flb_sds_destroy(rendered);
flb_sds_destroy(output);
return FLB_RETRY;
}

if (flb_sds_cat_safe(&output,
rendered + prefix_len,
(int) inner_len) != 0) {
flb_sds_destroy(rendered);
flb_sds_destroy(output);
return FLB_RETRY;
}

first_entry = FLB_FALSE;
}

flb_sds_destroy(rendered);
c++;
}

if (ret != CMT_DECODE_MSGPACK_INSUFFICIENT_DATA || c == 0) {
flb_plg_error(ctx->ins, "error decoding metrics msgpack encoded context");
flb_sds_destroy(output);
return FLB_ERROR;
}

if (flb_sds_cat_safe(&output, "]}", 2) != 0) {
flb_sds_destroy(output);
return FLB_RETRY;
}

result = opentelemetry_post(ctx,
output, flb_sds_len(output),
event_chunk->tag,
flb_sds_len(event_chunk->tag),
ctx->metrics_uri_sanitized,
ctx->grpc_metrics_uri,
FLB_OPENTELEMETRY_MIME_JSON_LITERAL);
flb_sds_destroy(output);
return result;
}

static int process_metrics(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out1_flush,
struct flb_input_instance *ins, void *out_context,
Expand All @@ -748,6 +849,10 @@ static int process_metrics(struct flb_event_chunk *event_chunk,

/* Initialize vars */
ctx = out_context;

if (ctx->use_json_encoding) {
return process_metrics_json(event_chunk, ctx);
}
Comment on lines +853 to +855
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve add_label behavior for JSON metric encoding

This early return bypasses the existing metrics path that calls append_labels(ctx, cmt) before encoding, so configured add_label entries are silently dropped whenever encoding=json is used. That changes exported metric content compared to protobuf mode and regresses a documented plugin option.

Useful? React with 👍 / 👎.

ok = CMT_DECODE_MSGPACK_SUCCESS;
result = FLB_OK;

Expand Down Expand Up @@ -800,7 +905,8 @@ static int process_metrics(struct flb_event_chunk *event_chunk,
event_chunk->tag,
flb_sds_len(event_chunk->tag),
ctx->metrics_uri_sanitized,
ctx->grpc_metrics_uri);
ctx->grpc_metrics_uri,
FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL);

/* Debug http_post() result statuses */
if (result == FLB_OK) {
Expand Down Expand Up @@ -830,6 +936,31 @@ static int process_metrics(struct flb_event_chunk *event_chunk,
return result;
}

static int process_traces_json(struct flb_event_chunk *event_chunk,
struct opentelemetry_context *ctx)
{
int result;
flb_sds_t json_payload;

json_payload = flb_opentelemetry_traces_msgpack_to_otlp_json(event_chunk->data,
event_chunk->size,
&result);
if (json_payload == NULL) {
flb_plg_error(ctx->ins, "failed to encode traces as OTLP JSON (result=%d)", result);
return FLB_ERROR;
}

result = opentelemetry_post(ctx,
json_payload, flb_sds_len(json_payload),
event_chunk->tag,
flb_sds_len(event_chunk->tag),
ctx->traces_uri_sanitized,
ctx->grpc_traces_uri,
FLB_OPENTELEMETRY_MIME_JSON_LITERAL);
flb_sds_destroy(json_payload);
return result;
}

static int process_traces(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *ins, void *out_context,
Expand All @@ -847,6 +978,10 @@ static int process_traces(struct flb_event_chunk *event_chunk,
ctx = out_context;
result = FLB_OK;

if (ctx->use_json_encoding) {
return process_traces_json(event_chunk, ctx);
}

buf = flb_sds_create_size(event_chunk->size);
if (!buf) {
flb_plg_error(ctx->ins, "could not allocate outgoing buffer");
Expand Down Expand Up @@ -891,7 +1026,8 @@ static int process_traces(struct flb_event_chunk *event_chunk,
event_chunk->tag,
flb_sds_len(event_chunk->tag),
ctx->traces_uri_sanitized,
ctx->grpc_traces_uri);
ctx->grpc_traces_uri,
FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL);

/* Debug http_post() result statuses */
if (result == FLB_OK) {
Expand Down Expand Up @@ -973,7 +1109,8 @@ static int process_profiles(struct flb_event_chunk *event_chunk,
event_chunk->tag,
flb_sds_len(event_chunk->tag),
ctx->profiles_uri_sanitized,
ctx->grpc_profiles_uri);
ctx->grpc_profiles_uri,
FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL);

/* Debug http_post() result statuses */
if (result == FLB_OK) {
Expand Down Expand Up @@ -1070,7 +1207,7 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_BOOL, "grpc", "off",
0, FLB_TRUE, offsetof(struct opentelemetry_context, enable_grpc_flag),
"Enable, disable or force gRPC usage. Accepted values : on, off, auto"
"Enable gRPC transport (OTLP/gRPC). When enabled, HTTP/2 is automatically activated. Accepted values: on, off"
},
{
FLB_CONFIG_MAP_STR, "proxy", NULL,
Expand Down Expand Up @@ -1123,6 +1260,15 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct opentelemetry_context, batch_size),
"Set the maximum number of log records to be flushed at a time"
},
{
FLB_CONFIG_MAP_STR, "encoding", "protobuf",
0, FLB_FALSE, 0,
"Set the encoding for exports (logs, traces, metrics). Accepted values: 'protobuf' (default, "
"Content-Type: application/x-protobuf) or 'json' (Content-Type: application/json). "
"Note: 'json' only applies to OTLP/HTTP transport; OTLP/gRPC (grpc=on) always uses protobuf "
"per the OpenTelemetry specification and will reject this combination at startup. "
"Note: profiles always use protobuf (no JSON encoder exists)."
},
{
FLB_CONFIG_MAP_STR, "compress", NULL,
0, FLB_FALSE, 0,
Expand Down Expand Up @@ -1160,7 +1306,8 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "logs_body_key", NULL,
FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct opentelemetry_context, log_body_key_list_str),
"Specify an optional HTTP URI for the target OTel endpoint."
"Record accessor pattern(s) used to populate the OTLP log body. The first matching key wins. "
"May be specified multiple times. Defaults to '$log' then '$message'."
},

{
Expand Down
7 changes: 6 additions & 1 deletion plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#define FLB_OPENTELEMETRY_CONTENT_TYPE_HEADER_NAME "Content-Type"
#define FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL "application/x-protobuf"
#define FLB_OPENTELEMETRY_MIME_JSON_LITERAL "application/json"

/*
* This lets you send log records in batches instead of a request per log record
Expand Down Expand Up @@ -190,6 +191,9 @@ struct opentelemetry_context {
/* compression: zstd */
int compress_zstd;

/* encoding: 0 = protobuf (default), 1 = JSON */
int use_json_encoding;

/* FLB/OTLP Record accessor patterns */
struct flb_record_accessor *ra_meta_schema;
struct flb_record_accessor *ra_meta_resource_id;
Expand Down Expand Up @@ -217,5 +221,6 @@ int opentelemetry_post(struct opentelemetry_context *ctx,
const void *body, size_t body_len,
const char *tag, int tag_len,
const char *http_uri,
const char *grpc_uri);
const char *grpc_uri,
const char *content_type);
#endif
38 changes: 38 additions & 0 deletions plugins/out_opentelemetry/opentelemetry_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,44 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
}
}

ctx->use_json_encoding = FLB_FALSE;
tmp = flb_output_get_property("encoding", ins);
if (tmp) {
if (strcasecmp(tmp, "json") == 0) {
ctx->use_json_encoding = FLB_TRUE;
}
Comment on lines +554 to +556
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reject JSON encoding when gRPC transport is enabled

Allowing encoding=json here breaks OTLP/gRPC exports: ctx->use_json_encoding drives the logs/metrics/traces JSON paths, but opentelemetry_post() still wraps that payload in gRPC framing (application/grpc) when grpc on is active. OTLP/gRPC expects protobuf messages, so this combination sends invalid wire payloads and causes export failures for gRPC users who enable the new option.

Useful? React with 👍 / 👎.

else if (strcasecmp(tmp, "protobuf") != 0) {
flb_plg_error(ctx->ins,
"Unknown encoding value '%s'. "
"Accepted values: 'protobuf' (default) or 'json'", tmp);
flb_opentelemetry_context_destroy(ctx);
return NULL;
}
}

/*
* Per the OpenTelemetry specification, OTLP/gRPC exclusively uses protobuf
* encoding on the wire. Reject the combination early so the user gets a
* clear error rather than silent data corruption.
* Reference: https://opentelemetry.io/docs/specs/otlp/#otlpgrpc
*/
if (ctx->use_json_encoding && ctx->enable_grpc_flag) {
flb_plg_error(ctx->ins,
"encoding=json is incompatible with grpc=on. "
"OTLP/gRPC only supports protobuf encoding per the "
"OpenTelemetry specification. "
"Use encoding=json with HTTP transport (grpc=off) instead.");
flb_opentelemetry_context_destroy(ctx);
return NULL;
}

if (ctx->use_json_encoding) {
flb_plg_warn(ctx->ins,
"encoding=json does not apply to profiles: profiles "
"will always be exported as protobuf "
"(no JSON encoder exists for cprofiles)");
}

ctx->ra_observed_timestamp_metadata = flb_ra_create((char*)ctx->logs_observed_timestamp_metadata_key,
FLB_FALSE);
if (ctx->ra_observed_timestamp_metadata == NULL) {
Expand Down
Loading
Loading