Присоединение схемы JSON к записям потока KSQL
Я использую KSQL, и до сих пор он работал отлично. Но теперь я хотел бы передать вывод в BigQuery через Kafka Connect, и мне нужно присоединить схему JSON. У меня проблемы с выяснением, как это сделать. Вот мой запрос:
CREATE STREAM tweets_original (
CreatedAt BIGINT,
Id BIGINT,
Text VARCHAR,
Source VARCHAR,
GeoLocation VARCHAR,
User STRUCT<Id BIGINT, Name VARCHAR, Description VARCHAR, ScreenName VARCHAR, URL VARCHAR, FollowersCount BIGINT, FriendsCount BIGINT>
)
WITH (kafka_topic='tweets', value_format='JSON');
CREATE STREAM tweets_new
WITH (kafka_topic='tweets-new') AS
SELECT
CreatedAt as created_at,
Id as tweet_id,
Text as tweet_text,
Source as source,
GeoLocation as geo_location,
User->Id as user_id,
User->Name as user_name,
User->Description as user_description,
User->ScreenName as user_screenname
FROM tweets_original ;
Вот пример записи, которая была записана в тему вывода (tweets-new
).
{
"CREATED_AT": 1535036410000,
"TWEET_ID": 1032643668614819800,
"TWEET_TEXT": "Sample text",
"SOURCE": "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
"GEO_LOCATION": null,
"USER_ID": 123,
"USER_NAME": "John Smith",
"USER_DESCRIPTION": "Developer in Chief",
"USER_SCREENNAME": "newphonewhodis"
}
Однако для того, чтобы Kafka Connect мог передать эти записи в BigQuery, мне нужно прикрепить схему, например, так:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "CREATED_AT"
},
{
"type": "int64",
"optional": false,
"field": "TWEET_ID"
},
{
"type": "string",
"optional": false,
"field": "TWEET_TEXT"
}
...
],
"optional": false,
"name": "foobar"
},
"payload": {...}
}
В любом случае, я не вижу ничего в документах, показывающих, как я мог бы подойти к этому (возможно, я смотрю не в том месте). Любая помощь будет принята с благодарностью!
0 ответов
Это простое решение для KSQL, просто обновите второй поток до AVRO.
CREATE STREAM tweets_new
WITH (kafka_topic='tweets-new', value_format='AVRO') AS
SELECT
CreatedAt as created_at,
Id as tweet_id,
Text as tweet_text,
Source as source,
GeoLocation as geo_location,
User->Id as user_id,
User->Name as user_name,
User->Description as user_description,
User->ScreenName as user_screenname
FROM tweets_original ;
Затем в конфигурации Kafka Connect вы можете использовать AvroConvertor и разрешить эволюцию / управление схемой в Google Big Query.