Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ jobs:
- "-DFLB_SIMD=Off"
- "-DFLB_ARROW=On"
- "-DFLB_COMPILER_STRICT_POINTER_TYPES=On"
- "-DFLB_AVRO_ENCODER=On -DCMAKE_POLICY_VERSION_MINIMUM=3.5"
cmake_version:
- "3.31.6"
compiler:
Expand All @@ -77,6 +78,10 @@ jobs:
compiler:
cc: clang
cxx: clang++
- flb_option: "-DFLB_AVRO_ENCODER=On -DCMAKE_POLICY_VERSION_MINIMUM=3.5"
compiler:
cc: clang
cxx: clang++
permissions:
contents: read
steps:
Expand Down
70 changes: 50 additions & 20 deletions src/flb_avro.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/

#include <assert.h>
#include <inttypes.h>
#include <limits.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>

Expand All @@ -28,14 +31,50 @@
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_avro.h>

static inline int do_avro(bool call, const char *msg) {
static inline int do_avro(bool call, const char *msg)
{
if (call) {
flb_error("%s:\n %s\n", msg, avro_strerror());
return FLB_FALSE;
flb_error("%s:\n %s\n", msg, avro_strerror());
return FLB_FALSE;
}
return FLB_TRUE;
}

static int set_avro_integer(avro_value_t *val, int64_t i64, uint64_t u64, int positive)
{
avro_type_t type;

type = avro_value_get_type(val);

if (type == AVRO_INT64) {
if (positive && u64 > INT64_MAX) {
flb_error("positive integer value exceeds Avro long range: %" PRIu64, u64);
return FLB_FALSE;
}

return do_avro(avro_value_set_long(val, positive ? (int64_t) u64 : i64),
positive ? "failed on posint" : "failed on negint");
}

if (type == AVRO_INT32) {
if (positive && u64 > INT32_MAX) {
flb_error("positive integer value exceeds Avro int range: %" PRIu64, u64);
return FLB_FALSE;
Comment on lines +60 to +62
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Propagate integer conversion failures from arrays

When this new range check fires for an element inside an Avro array, msgpack2avro does not stop or preserve the failure in the array case: it appends the element, sets ret to FLB_FALSE, then continues and can overwrite ret with a later successful element. For an int array such as [2147483648, 1], flb_msgpack_to_avro can return success even though the first element could not be assigned, so the encoder may emit a record with an unset/defaulted array element instead of rejecting the invalid input.

Useful? React with 👍 / 👎.

}
else if (!positive && (i64 < INT32_MIN || i64 > INT32_MAX)) {
flb_error("negative integer value exceeds Avro int range: %" PRIi64, i64);
return FLB_FALSE;
}

return do_avro(avro_value_set_int(val, positive ? (int32_t) u64 : (int32_t) i64),
positive ? "failed on posint" : "failed on negint");
}

flb_error("integer value cannot be assigned to Avro type %d", type);

return FLB_FALSE;
}

avro_value_iface_t *flb_avro_init(avro_value_t *aobject, char *json, size_t json_len, avro_schema_t *aschema)
{

Expand Down Expand Up @@ -85,33 +124,20 @@ int msgpack2avro(avro_value_t *val, msgpack_object *o)
#if defined(PRIu64)
// msgpack_pack_fix_uint64
flb_debug("got a posint: %" PRIu64 "\n", o->via.u64);
ret = do_avro(avro_value_set_int(val, o->via.u64), "failed on posint");
#else
if (o.via.u64 > ULONG_MAX)
flb_warn("over \"%lu\"", ULONG_MAX);
ret = do_avro(avro_value_set_int(val, ULONG_MAX), "failed on posint");
else
flb_debug("got a posint: %lu\n", (unsigned long)o->via.u64);
ret = do_avro(avro_value_set_int(val, o->via.u64), "failed on posint");
flb_debug("got a posint: %lu\n", (unsigned long) o->via.u64);
#endif
ret = set_avro_integer(val, 0, o->via.u64, FLB_TRUE);

break;

case MSGPACK_OBJECT_NEGATIVE_INTEGER:
#if defined(PRIi64)
flb_debug("got a negint: %" PRIi64 "\n", o->via.i64);
ret = do_avro(avro_value_set_int(val, o->via.i64), "failed on negint");
#else
if (o->via.i64 > LONG_MAX)
flb_warn("over +\"%ld\"", LONG_MAX);
ret = do_avro(avro_value_set_int(val, LONG_MAX), "failed on negint");
else if (o->via.i64 < LONG_MIN)
flb_warn("under -\"%ld\"", LONG_MIN);
ret = do_avro(avro_value_set_int(val, LONG_MIN), "failed on negint");
else
flb_debug("got a negint: %ld\n", (signed long)o->via.i64);
ret = do_avro(avro_value_set_int(val, o->via.i64), "failed on negint");
flb_debug("got a negint: %ld\n", (signed long) o->via.i64);
#endif
ret = set_avro_integer(val, o->via.i64, 0, FLB_FALSE);
break;

case MSGPACK_OBJECT_FLOAT32:
Expand Down Expand Up @@ -207,6 +233,10 @@ int msgpack2avro(avro_value_t *val, msgpack_object *o)
ret = flb_msgpack_to_avro(&element, &p->val);

flb_sds_destroy(key);

if (ret == FLB_FALSE) {
goto msg2avro_end;
}
}
}
break;
Expand Down
Loading
Loading