diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index a51e5b7d4e6..014255a9c11 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -55,6 +55,7 @@ jobs: - "-DFLB_SIMD=Off" - "-DFLB_ARROW=On" - "-DFLB_COMPILER_STRICT_POINTER_TYPES=On" + - "-DFLB_AVRO_ENCODER=On -DCMAKE_POLICY_VERSION_MINIMUM=3.5" cmake_version: - "3.31.6" compiler: @@ -77,6 +78,10 @@ jobs: compiler: cc: clang cxx: clang++ + - flb_option: "-DFLB_AVRO_ENCODER=On -DCMAKE_POLICY_VERSION_MINIMUM=3.5" + compiler: + cc: clang + cxx: clang++ permissions: contents: read steps: diff --git a/src/flb_avro.c b/src/flb_avro.c index 6e978edf7d9..09cea7ea1ad 100644 --- a/src/flb_avro.c +++ b/src/flb_avro.c @@ -18,6 +18,9 @@ */ #include +#include +#include +#include #include #include @@ -28,14 +31,50 @@ #include #include -static inline int do_avro(bool call, const char *msg) { +static inline int do_avro(bool call, const char *msg) +{ if (call) { - flb_error("%s:\n %s\n", msg, avro_strerror()); - return FLB_FALSE; + flb_error("%s:\n %s\n", msg, avro_strerror()); + return FLB_FALSE; } return FLB_TRUE; } +static int set_avro_integer(avro_value_t *val, int64_t i64, uint64_t u64, int positive) +{ + avro_type_t type; + + type = avro_value_get_type(val); + + if (type == AVRO_INT64) { + if (positive && u64 > INT64_MAX) { + flb_error("positive integer value exceeds Avro long range: %" PRIu64, u64); + return FLB_FALSE; + } + + return do_avro(avro_value_set_long(val, positive ? (int64_t) u64 : i64), + positive ? "failed on posint" : "failed on negint"); + } + + if (type == AVRO_INT32) { + if (positive && u64 > INT32_MAX) { + flb_error("positive integer value exceeds Avro int range: %" PRIu64, u64); + return FLB_FALSE; + } + else if (!positive && (i64 < INT32_MIN || i64 > INT32_MAX)) { + flb_error("negative integer value exceeds Avro int range: %" PRIi64, i64); + return FLB_FALSE; + } + + return do_avro(avro_value_set_int(val, positive ? (int32_t) u64 : (int32_t) i64), + positive ? "failed on posint" : "failed on negint"); + } + + flb_error("integer value cannot be assigned to Avro type %d", type); + + return FLB_FALSE; +} + avro_value_iface_t *flb_avro_init(avro_value_t *aobject, char *json, size_t json_len, avro_schema_t *aschema) { @@ -85,33 +124,20 @@ int msgpack2avro(avro_value_t *val, msgpack_object *o) #if defined(PRIu64) // msgpack_pack_fix_uint64 flb_debug("got a posint: %" PRIu64 "\n", o->via.u64); - ret = do_avro(avro_value_set_int(val, o->via.u64), "failed on posint"); #else - if (o.via.u64 > ULONG_MAX) - flb_warn("over \"%lu\"", ULONG_MAX); - ret = do_avro(avro_value_set_int(val, ULONG_MAX), "failed on posint"); - else - flb_debug("got a posint: %lu\n", (unsigned long)o->via.u64); - ret = do_avro(avro_value_set_int(val, o->via.u64), "failed on posint"); + flb_debug("got a posint: %lu\n", (unsigned long) o->via.u64); #endif + ret = set_avro_integer(val, 0, o->via.u64, FLB_TRUE); break; case MSGPACK_OBJECT_NEGATIVE_INTEGER: #if defined(PRIi64) flb_debug("got a negint: %" PRIi64 "\n", o->via.i64); - ret = do_avro(avro_value_set_int(val, o->via.i64), "failed on negint"); #else - if (o->via.i64 > LONG_MAX) - flb_warn("over +\"%ld\"", LONG_MAX); - ret = do_avro(avro_value_set_int(val, LONG_MAX), "failed on negint"); - else if (o->via.i64 < LONG_MIN) - flb_warn("under -\"%ld\"", LONG_MIN); - ret = do_avro(avro_value_set_int(val, LONG_MIN), "failed on negint"); - else - flb_debug("got a negint: %ld\n", (signed long)o->via.i64); - ret = do_avro(avro_value_set_int(val, o->via.i64), "failed on negint"); + flb_debug("got a negint: %ld\n", (signed long) o->via.i64); #endif + ret = set_avro_integer(val, o->via.i64, 0, FLB_FALSE); break; case MSGPACK_OBJECT_FLOAT32: @@ -207,6 +233,10 @@ int msgpack2avro(avro_value_t *val, msgpack_object *o) ret = flb_msgpack_to_avro(&element, &p->val); flb_sds_destroy(key); + + if (ret == FLB_FALSE) { + goto msg2avro_end; + } } } break; diff --git a/tests/internal/avro.c b/tests/internal/avro.c index 45f0734391c..7fca32c6c32 100644 --- a/tests/internal/avro.c +++ b/tests/internal/avro.c @@ -1,5 +1,6 @@ /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ #include +#include #include #include #include @@ -24,38 +25,44 @@ const char JSON_SINGLE_MAP_001_SCHEMA[] = {\"type\": \"array\", \"items\":\ {\"type\": \"map\",\"values\": \"int\"}}}]}"; -msgpack_unpacked test_init(avro_value_t *aobject, avro_schema_t *aschema, const char *json_schema, const char *json_data) { - char *out_buf; +msgpack_unpacked test_init(avro_value_t *aobject, avro_schema_t *aschema, + const char *json_schema, const char *json_data, + char **out_buf) +{ size_t out_size; int root_type; + size_t len; + char *data; + avro_value_iface_t *aclass; + msgpack_unpacked msg; - avro_value_iface_t *aclass = flb_avro_init(aobject, (char *)json_schema, strlen(json_schema), aschema); + aclass = flb_avro_init(aobject, (char *)json_schema, strlen(json_schema), aschema); TEST_CHECK(aclass != NULL); - char *data = mk_file_to_buffer(json_data); + data = mk_file_to_buffer(json_data); TEST_CHECK(data != NULL); - size_t len = strlen(data); + len = strlen(data); - TEST_CHECK(flb_pack_json(data, len, &out_buf, &out_size, &root_type, NULL) == 0); + TEST_CHECK(flb_pack_json(data, len, out_buf, &out_size, &root_type, NULL) == 0); - msgpack_unpacked msg; msgpack_unpacked_init(&msg); - TEST_CHECK(msgpack_unpack_next(&msg, out_buf, out_size, NULL) == MSGPACK_UNPACK_SUCCESS); + TEST_CHECK(msgpack_unpack_next(&msg, *out_buf, out_size, NULL) == MSGPACK_UNPACK_SUCCESS); avro_value_iface_decref(aclass); flb_free(data); - flb_free(out_buf); return msg; } /* Unpack msgpack per avro schema */ void test_unpack_to_avro() { + char *out_buf = NULL; avro_value_t aobject; avro_schema_t aschema; + msgpack_unpacked mp; - msgpack_unpacked mp = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA, AVRO_SINGLE_MAP1); + mp = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA, AVRO_SINGLE_MAP1, &out_buf); msgpack_object_print(stderr, mp.data); flb_msgpack_to_avro(&aobject, &mp.data); @@ -141,6 +148,7 @@ void test_unpack_to_avro() avro_value_decref(&aobject); avro_schema_decref(aschema); msgpack_unpacked_destroy(&mp); + flb_free(out_buf); } void test_parse_reordered_schema() @@ -154,11 +162,12 @@ void test_parse_reordered_schema() int i=0; for (i=0; schemas[i] != NULL ; i++) { - + char *out_buf = NULL; avro_value_t aobject = {0}; avro_schema_t aschema = {0}; + msgpack_unpacked msg; - msgpack_unpacked msg = test_init(&aobject, &aschema, schemas[i], AVRO_MULTILINE_JSON); + msg = test_init(&aobject, &aschema, schemas[i], AVRO_MULTILINE_JSON, &out_buf); msgpack_object_print(stderr, msg.data); @@ -226,6 +235,7 @@ void test_parse_reordered_schema() avro_schema_decref(aschema); msgpack_unpacked_destroy(&msg); avro_value_decref(&aobject); + flb_free(out_buf); } } @@ -265,6 +275,151 @@ void test_msgpack2avro() msgpack_zone_destroy(&mempool); msgpack_sbuffer_destroy(&sbuf); } + +const char JSON_INT64_SCHEMA[] = +"{\"type\":\"record\"," +"\"name\":\"Int64Record\"," +"\"fields\":[" +"{\"name\":\"positive\",\"type\":\"long\"}," +"{\"name\":\"negative\",\"type\":\"long\"}]}"; + +const char JSON_INT_SCHEMA[] = +"{\"type\":\"record\"," +"\"name\":\"IntRecord\"," +"\"fields\":[" +"{\"name\":\"bad\",\"type\":\"int\"}]}"; + +const char JSON_INT_MULTI_FIELD_SCHEMA[] = +"{\"type\":\"record\"," +"\"name\":\"IntMultiFieldRecord\"," +"\"fields\":[" +"{\"name\":\"first\",\"type\":\"int\"}," +"{\"name\":\"bad\",\"type\":\"int\"}," +"{\"name\":\"last\",\"type\":\"string\"}]}"; + +void test_msgpack_to_avro_int64() +{ + int64_t positive_expected = 4294967296LL; + int64_t negative_expected = -2147483649LL; + int64_t actual = 0; + avro_value_t aobject; + avro_value_t test_value; + avro_schema_t aschema; + avro_value_iface_t *aclass; + msgpack_sbuffer sbuf; + msgpack_packer pk; + msgpack_unpacked msg; + + aclass = flb_avro_init(&aobject, (char *) JSON_INT64_SCHEMA, + strlen(JSON_INT64_SCHEMA), &aschema); + TEST_CHECK(aclass != NULL); + + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&pk, 2); + msgpack_pack_str(&pk, 8); + msgpack_pack_str_body(&pk, "positive", 8); + msgpack_pack_uint64(&pk, (uint64_t) positive_expected); + msgpack_pack_str(&pk, 8); + msgpack_pack_str_body(&pk, "negative", 8); + msgpack_pack_int64(&pk, negative_expected); + + msgpack_unpacked_init(&msg); + TEST_CHECK(msgpack_unpack_next(&msg, sbuf.data, sbuf.size, NULL) == + MSGPACK_UNPACK_SUCCESS); + TEST_CHECK(flb_msgpack_to_avro(&aobject, &msg.data) == FLB_TRUE); + + TEST_CHECK(avro_value_get_by_name(&aobject, "positive", &test_value, NULL) == 0); + TEST_CHECK(avro_value_get_long(&test_value, &actual) == 0); + TEST_CHECK(actual == positive_expected); + + TEST_CHECK(avro_value_get_by_name(&aobject, "negative", &test_value, NULL) == 0); + TEST_CHECK(avro_value_get_long(&test_value, &actual) == 0); + TEST_CHECK(actual == negative_expected); + + msgpack_unpacked_destroy(&msg); + msgpack_sbuffer_destroy(&sbuf); + avro_value_decref(&aobject); + avro_value_iface_decref(aclass); + avro_schema_decref(aschema); +} + +void test_msgpack_to_avro_int_range() +{ + uint64_t too_big = 2147483648ULL; + avro_value_t aobject; + avro_schema_t aschema; + avro_value_iface_t *aclass; + msgpack_sbuffer sbuf; + msgpack_packer pk; + msgpack_unpacked msg; + + aclass = flb_avro_init(&aobject, (char *) JSON_INT_SCHEMA, + strlen(JSON_INT_SCHEMA), &aschema); + TEST_CHECK(aclass != NULL); + + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&pk, 1); + msgpack_pack_str(&pk, 3); + msgpack_pack_str_body(&pk, "bad", 3); + msgpack_pack_uint64(&pk, too_big); + + msgpack_unpacked_init(&msg); + TEST_CHECK(msgpack_unpack_next(&msg, sbuf.data, sbuf.size, NULL) == + MSGPACK_UNPACK_SUCCESS); + TEST_CHECK(flb_msgpack_to_avro(&aobject, &msg.data) == FLB_FALSE); + + msgpack_unpacked_destroy(&msg); + msgpack_sbuffer_destroy(&sbuf); + avro_value_decref(&aobject); + avro_value_iface_decref(aclass); + avro_schema_decref(aschema); +} + +void test_msgpack_to_avro_int_range_multi_field() +{ + uint64_t too_big = 2147483648ULL; + avro_value_t aobject; + avro_schema_t aschema; + avro_value_iface_t *aclass; + msgpack_sbuffer sbuf; + msgpack_packer pk; + msgpack_unpacked msg; + + aclass = flb_avro_init(&aobject, (char *) JSON_INT_MULTI_FIELD_SCHEMA, + strlen(JSON_INT_MULTI_FIELD_SCHEMA), &aschema); + TEST_CHECK(aclass != NULL); + + msgpack_sbuffer_init(&sbuf); + msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&pk, 3); + msgpack_pack_str(&pk, 5); + msgpack_pack_str_body(&pk, "first", 5); + msgpack_pack_int(&pk, 1); + msgpack_pack_str(&pk, 3); + msgpack_pack_str_body(&pk, "bad", 3); + msgpack_pack_uint64(&pk, too_big); + msgpack_pack_str(&pk, 4); + msgpack_pack_str_body(&pk, "last", 4); + msgpack_pack_str(&pk, 1); + msgpack_pack_str_body(&pk, "x", 1); + + msgpack_unpacked_init(&msg); + TEST_CHECK(msgpack_unpack_next(&msg, sbuf.data, sbuf.size, NULL) == + MSGPACK_UNPACK_SUCCESS); + TEST_CHECK(flb_msgpack_to_avro(&aobject, &msg.data) == FLB_FALSE); + + msgpack_unpacked_destroy(&msg); + msgpack_sbuffer_destroy(&sbuf); + avro_value_decref(&aobject); + avro_value_iface_decref(aclass); + avro_schema_decref(aschema); +} + const char JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION[] = "{\"type\":\"record\",\ \"name\":\"Map001\",\ @@ -282,10 +437,12 @@ const char JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION[] = {\"type\": \"map\",\"values\": \"int\"}}}]}"; void test_union_type_sanity() { + char *out_buf = NULL; avro_value_t aobject; avro_schema_t aschema; + msgpack_unpacked msg; - msgpack_unpacked msg = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION, AVRO_SINGLE_MAP1); + msg = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION, AVRO_SINGLE_MAP1, &out_buf); msgpack_object_print(stderr, msg.data); flb_msgpack_to_avro(&aobject, &msg.data); @@ -341,14 +498,17 @@ void test_union_type_sanity() avro_value_decref(&aobject); avro_schema_decref(aschema); msgpack_unpacked_destroy(&msg); + flb_free(out_buf); } void test_union_type_branches() { + char *out_buf = NULL; avro_value_t aobject; avro_schema_t aschema; + msgpack_unpacked mp; - msgpack_unpacked mp = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION, AVRO_SINGLE_MAP1); + mp = test_init(&aobject, &aschema, JSON_SINGLE_MAP_001_SCHEMA_WITH_UNION, AVRO_SINGLE_MAP1, &out_buf); flb_msgpack_to_avro(&aobject, &mp.data); @@ -371,11 +531,16 @@ void test_union_type_branches() avro_value_decref(&aobject); avro_schema_decref(aschema); msgpack_unpacked_destroy(&mp); + flb_free(out_buf); } TEST_LIST = { /* Avro */ { "msgpack_to_avro_basic", test_unpack_to_avro}, { "test_parse_reordered_schema", test_parse_reordered_schema}, + { "test_msgpack_to_avro_int64", test_msgpack_to_avro_int64}, + { "test_msgpack_to_avro_int_range", test_msgpack_to_avro_int_range}, + { "test_msgpack_to_avro_int_range_multi_field", + test_msgpack_to_avro_int_range_multi_field}, { "test_union_type_sanity", test_union_type_sanity}, { "test_union_type_branches", test_union_type_branches}, { 0 }