PyFlink - укажите формат таблицы и обработайте вложенные строковые данные JSON

У меня есть объект данных JSON как таковой:

{
    "monitorId": 865,
    "deviceId": "94:54:93:49:96:13",
    "data": "{\"0001\":105.0,\"0002\":1.21,\"0003\":0.69,\"0004\":1.46,\"0005\":47.43,\"0006\":103.3}",
    "state": 2,
    "time": 1593687809180
}

Поле dataсам по себе является строкой объекта JSON. Как выразить эту схему в терминах API таблиц Flink? Я попытался создать UDF, который принимает строку JSON и выводит проанализированное содержимое. Однако я не могу найти способ заполнитьDataTypes.ROW объекты:

t_env.connect(
    Kafka()
        .version("universal")
        .topic(INPUT_TOPIC)
        .property("bootstrap.servers", PROD_KAFKA)
        .property("zookeeper.connect", PROD_ZOOKEEPER)
        .start_from_latest()
) \
    .with_format(
    Json()
        .json_schema(
        """
        {
    "type": "object",
    "properties": {
        "monitorId": {
            "type": "string"
        },
        "deviceId": {
            "type": "string"
        },
        "data": {
            "type": "string"
        },
        "state": {
            "type": "integer"
        },
        "time": {
            "type": "string"
        }
    }
}
    """
    )
) \
    .with_schema(
    Schema()
        .field("monitorId", DataTypes.STRING())
        .field("deviceId", DataTypes.STRING())
        .field("time", DataTypes.STRING())
        .field("data", DataTypes.STRING())
        .field("state", DataTypes.STRING())
) \
    .register_table_source(INPUT_TABLE)

t_env.connect(Kafka()
              .version("universal")
              .topic(OUTPUT_TOPIC)
              .property("bootstrap.servers", LOCAL_KAFKA)
              .property("zookeeper.connect", LOCAL_ZOOKEEPER)
              .start_from_latest()
              ) \
    .with_format(
    Json()
        .json_schema(
        """
         {
       "type": "object",
       "properties": {
           "monitorId": {
               "type": "string"
           },
           "data": {
               "type": "string"
           },
           "time": {
               "type": "string"
           }   
       }
   }
        """
    )
) \
    .with_schema(
    Schema()
        .field("monitorId", DataTypes.STRING())
        .field("time", DataTypes.STRING())
        .field("data", DataTypes.ROW([DataTypes.FIELD("feature1", DataTypes.STRING())]))
) \
    .register_table_sink(OUTPUT_TABLE)



class DataConverter(ScalarFunction):
    def eval(self, str_data):
        data = json.loads(str_data)
        return ? # <--- how do I populate the DataType.ROW with each individual value from data?

t_env.register_function("data_converter", udf(DataConverter(), input_types = [DataTypes.STRING()],
                                              result_type =
                                              DataTypes.ROW([
                                                  DataTypes.FIELD("feature1", DataTypes.STRING())
                                              ])))

t_env.from_path(INPUT_TABLE) \
    .select("monitorId, time, data_converter(data)") \
    .insert_into(OUTPUT_TABLE)

t_env.execute("IU pyflink job")


1 ответ

Если вы хотите, чтобы тип результата Python UDF был DataTypes.Row, вы можете использовать класс Python Row чтобы обернуть его. Rowпроисходит от кортежа. Вы можете использовать следующий код для его импорта:from pyflink.table.types import Row

Другие вопросы по тегам