PyFlink - укажите формат таблицы и обработайте вложенные строковые данные JSON
У меня есть объект данных JSON как таковой:
{
"monitorId": 865,
"deviceId": "94:54:93:49:96:13",
"data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
"state": 2,
"time": 1593687809180
}
Поле data
сам по себе является строкой объекта JSON. Как выразить эту схему в терминах API таблиц Flink? Я попытался создать UDF, который принимает строку JSON и выводит проанализированное содержимое. Однако я не могу найти способ заполнитьDataTypes.ROW
объекты:
t_env.connect(
Kafka()
.version("universal")
.topic(INPUT_TOPIC)
.property("bootstrap.servers", PROD_KAFKA)
.property("zookeeper.connect", PROD_ZOOKEEPER)
.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"""
{
"type": "object",
"properties": {
"monitorId": {
"type": "string"
},
"deviceId": {
"type": "string"
},
"data": {
"type": "string"
},
"state": {
"type": "integer"
},
"time": {
"type": "string"
}
}
}
"""
)
) \
.with_schema(
Schema()
.field("monitorId", DataTypes.STRING())
.field("deviceId", DataTypes.STRING())
.field("time", DataTypes.STRING())
.field("data", DataTypes.STRING())
.field("state", DataTypes.STRING())
) \
.register_table_source(INPUT_TABLE)
t_env.connect(Kafka()
.version("universal")
.topic(OUTPUT_TOPIC)
.property("bootstrap.servers", LOCAL_KAFKA)
.property("zookeeper.connect", LOCAL_ZOOKEEPER)
.start_from_latest()
) \
.with_format(
Json()
.json_schema(
"""
{
"type": "object",
"properties": {
"monitorId": {
"type": "string"
},
"data": {
"type": "string"
},
"time": {
"type": "string"
}
}
}
"""
)
) \
.with_schema(
Schema()
.field("monitorId", DataTypes.STRING())
.field("time", DataTypes.STRING())
.field("data", DataTypes.ROW([DataTypes.FIELD("feature1", DataTypes.STRING())]))
) \
.register_table_sink(OUTPUT_TABLE)
class DataConverter(ScalarFunction):
def eval(self, str_data):
data = json.loads(str_data)
return ? # <--- how do I populate the DataType.ROW with each individual value from data?
t_env.register_function("data_converter", udf(DataConverter(), input_types = [DataTypes.STRING()],
result_type =
DataTypes.ROW([
DataTypes.FIELD("feature1", DataTypes.STRING())
])))
t_env.from_path(INPUT_TABLE) \
.select("monitorId, time, data_converter(data)") \
.insert_into(OUTPUT_TABLE)
t_env.execute("IU pyflink job")
1 ответ
Если вы хотите, чтобы тип результата Python UDF был DataTypes.Row
, вы можете использовать класс Python Row
чтобы обернуть его. Row
происходит от кортежа. Вы можете использовать следующий код для его импорта:from pyflink.table.types import Row