- Published on
Flink × TiDB 实时 ETL 踩坑记录
- Authors
- Name
- Wang Zhiwei
Flink Job Exceptions
java.io.IOException: Corrupt Maxwell JSON message '{"database":"tffi","table":"product","type":"delete","ts":1660102097,"old":{"id":1,"name":"scoot"}}'.
at org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.deserialize(MaxwellJsonDeserializationSchema.java:141)
详细
2022-08-10 03:30:10,097 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-300, groupId=null] Cluster ID: pku7rMr1RxecrRDqbt9lGA
2022-08-10 03:30:10,100 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, product]], fields=[id, name]) -> DropUpdateBefore -> Sink: Sink(table=[default_catalog.default_database.sink_product], fields=[id, name]) (1/1)#107 (c2c2ed5aa949a7a7832b2f80fb35ae4f) switched from RUNNING to FAILED.
java.io.IOException: Corrupt Maxwell JSON message '{"database":"tffi","table":"product","type":"delete","ts":1660102097,"old":{"id":1,"name":"scoot"}}'.
at org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.deserialize(MaxwellJsonDeserializationSchema.java:141) ~[flink-json-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-sql-connector-kafka_2.11-1.12.2%20(1).jar:1.12.2]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) ~[flink-sql-connector-kafka_2.11-1.12.2%20(1).jar:1.12.2]
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) ~[flink-sql-connector-kafka_2.11-1.12.2%20(1).jar:1.12.2]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) ~[flink-sql-connector-kafka_2.11-1.12.2%20(1).jar:1.12.2]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
Caused by: java.lang.NullPointerException
at org.apache.flink.formats.json.maxwell.MaxwellJsonDeserializationSchema.deserialize(MaxwellJsonDeserializationSchema.java:126) ~[flink-json-1.12.2.jar:1.12.2]
... 7 more
不同系统中 Maxwell 数据格式差异
TiCDC 的 Maxwell message
INSERT
{
"database": "tffi",
"table": "product",
"type": "insert",
"ts": 1660101830,
"data": {
"id": 4,
"name": "hammer"
}
}
UPDATE
{
"database": "tffi",
"table": "product",
"type": "update",
"ts": 1660102087,
"data": {
"id": 1,
"name": "scoot"
},
"old": {
"name": "scooter"
}
}
DELETE
{
"database": "tffi",
"table": "product",
"type": "delete",
"ts": 1660102097,
"old": {
"id": 1,
"name": "scoot"
}
}
Maxwell 官方的 dataformat
https://maxwells-daemon.io/dataformat/
DELETE
{
"database":"test",
"table":"e",
"type":"delete",
...
"data":{
"id":1,
"m":5.444,
"c":"2016-10-21 05:33:54.631000",
"comment":"I am a creature of light."
}
}
after a DELETE, data
contains a copy of the row, just before it shuffled off this mortal coil.
差异
对于 DELETE 消息,Maxwell 官方返回的是 data
field,而 TiCDC 是 old
field。
深入源码
Flink
MaxwellJsonDeserializationSchema.java - apache/flink - Sourcegraph
RowData row = jsonDeserializer.deserialize(message);
String type = row.getString(2).toString(); // "type" field
...
else if (OP_DELETE.equals(type)) {
// "data" field is a row, contains deleted rows
RowData delete = row.getRow(0, fieldCount);
delete.setRowKind(RowKind.DELETE);
out.collect(delete);
}
Flink 中 Maxwell 消息的数据格式和 Maxwell 官方一致,DELETE 消息都取的 data
field。
TiCDC
maxwell.go - pingcap/tiflow - Sourcegraph
if e.IsDelete() {
value.Type = "delete"
for _, v := range e.PreColumns {
switch v.Type {
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
if v.Value == nil {
value.Old[v.Name] = nil
} else if v.Flag.IsBinary() {
value.Old[v.Name] = v.Value
} else {
value.Old[v.Name] = string(v.Value.([]byte))
}
default:
value.Old[v.Name] = v.Value
}
}
}
解决
引用
https://ververica.github.io/flink-cdc-connectors/release-2.1/content/formats/changelog-json.html