【万字长文】Flink cdc源码精讲(推荐收藏)

【万字长文】Flink cdc源码精讲(推荐收藏)

前言 flink-cdc源码地址 : https://github.com/ververica/flink-cdc-connectors

flink-cdc不再flink项目中,在flink1.11之后flink引入cdc功能,下面我们以源码深入了解flink-cdc实现原理,

我们主要以flink-cdc-mysql为主,其余代码基本差不太多

事先需要先简单了解一下debezium相关原理,flink-cdc是基于debezium实现的

一点建议 :

在阅读源码的时候,我们应该带着问题去思考,然后一步一步去阅读源码,在阅读源码的过程中,不要被一些不重要的点给占用过多的时间精力,并且一遍两遍是不会让我有一个清晰的印象的,毕竟别人多少年多少人的开发,看一两遍就可以理解的,在阅读某个框架源码之前,我们应该已经对该框架原理有一定的理解,然后根据我们的理解去验证他是代码实现的样子,或者带着思考去阅读,为什么这么实现,这么实现的好处是什么等.,其实代码都是一样的,只不过是每个人的实现方式不同,考虑的问题不同而已,要有一定的java基础,熟悉多线程,了解开发使用的相关接口(或者自己看了介绍之后很容易理解),如果基础不牢,更多的是建议先从基础学习,然后写一写代码测试,比如多线程的时候怎么做交互等,自己写一写,在后面阅读源码的时候会更容理解里面内容该内容要首先对cdc有一定的了解,知道cdc的相关原理,flink-cdc的实现基于debezium实现,该框架是开源的,可以先去了解一下,这样对于我们后面内容会更容易理解 谨记: 阅读的时候抓住重点!!!!!!!! , 不要被不重要的内容占用时间

一.项目结构(mysql-cdc为主)1. 目录结构

带有test项目都是用于测试的项目后缀带有cdc的表示一个database的连接器,区分sql与api形式flink-format-changelog-json : 用于解析json成RowData的模块flink-connector-debezium : 该模块封装debezium以及相关核心代码实现,并且修改了debezium的部分源码每个项目中都有test目录,里面有相关的测试代码,可以自行测试代码debug2. mysql项目源码包结构

debezium : debezium用到的相关类schema : mysql schema(表结构)相关代码source : mysql-cdc source实现代码,包括全量读mysql,分割器,读取器等相关table : cdc table实现代码主要以table dynamic factory的实现resrouces : 该目录用与spi方式动态加载table factory,用于sql创建table找到对应的工厂类二.mysql-cdc源码-SourceFucntion的单并行度的实现

基于RichSourceFunction的,单并行读取 1,11之前的source接口,已被标记Deprecated基于Source的,多并行度,1.11之后新出的srouce接口,实现要更复杂我们主要根据单并行度源码基于讲解,这样更方便理解

具体入手我们可以根据文档中的创建source的类来一点一点走

代码语言:javascript复制MySqlSource通过构建者模式(23种设计模式)构建,我们只需要知道我们可以设置哪些参数即可,这个比较容易理解

// 通过构建者方式配置任务启动时候所需要的参数

public static class Builder {// MysqlSourcen内部类

private int port = 3306; // default 3306 port

private String hostname;

private String[] databaseList;

private String username;

private String password;

private Integer serverId;

private String serverTimeZone; //时区

private String[] tableList;

private Properties dbzProperties; // 传入的dbz引擎所需的属性

private StartupOptions startupOptions = StartupOptions.initial(); // 用于控制开始binlog开始消费位置的参数

private DebeziumDeserializationSchema deserializer; // 用于对数据解析成什么样子如json,String等,定义序列化方式

//上面参数配置完成,通过build构建sourceFunctionn,主要将配置信息封装带properties中,这里面的参数主要是debezium所需要启动参数,配置信息等,如果想要了解可以去debezium官网查看参数的具体细节

public DebeziumSourceFunction build() {

Properties props = new Properties();

props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());

// hard code server name, because we don't need to distinguish it, docs:

// Logical name that identifies and provides a namespace for the particular MySQL

// database

// server/cluster being monitored. The logical name should be unique across all other

// connectors,

// since it is used as a prefix for all Kafka topic names emanating from this connector.

// Only alphanumeric characters and underscores should be used.

props.setProperty("database.server.name", DATABASE_SERVER_NAME);

props.setProperty("database.hostname", checkNotNull(hostname));

props.setProperty("database.user", checkNotNull(username));

props.setProperty("database.password", checkNotNull(password));

props.setProperty("database.port", String.valueOf(port));

props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));

// debezium use "long" mode to handle unsigned bigint by default,

// but it'll cause lose of precise when the value is larger than 2^63,

// so use "precise" mode to avoid it.

props.put("bigint.unsigned.handling.mode", "precise");

if (serverId != null) { props.setProperty("database.server.id", String.valueOf(serverId)); }

if (databaseList != null) { props.setProperty("database.whitelist", String.join(",", databaseList)); }

if (tableList != null) { props.setProperty("table.whitelist", String.join(",", tableList));}

if (serverTimeZone != null) { props.setProperty("database.serverTimezone", serverTimeZone); }

// 判断开始消费位置,在sqlSourceBuilder中构建的参数,没有则为null

DebeziumOffset specificOffset = null;

switch (startupOptions.startupMode) {

case INITIAL:

props.setProperty("snapshot.mode", "initial");

break;

case EARLIEST_OFFSET:

props.setProperty("snapshot.mode", "never");

break;

case LATEST_OFFSET:

props.setProperty("snapshot.mode", "schema_only");

break;

case SPECIFIC_OFFSETS:

props.setProperty("snapshot.mode", "schema_only_recovery");

specificOffset = new DebeziumOffset();

Map sourcePartition = new HashMap<>();

sourcePartition.put("server", DATABASE_SERVER_NAME);

specificOffset.setSourcePartition(sourcePartition);

Map sourceOffset = new HashMap<>();

sourceOffset.put("file", startupOptions.specificOffsetFile);

sourceOffset.put("pos", startupOptions.specificOffsetPos);

specificOffset.setSourceOffset(sourceOffset);

break;

case TIMESTAMP:

checkNotNull(deserializer);

props.setProperty("snapshot.mode", "never");

deserializer =

new SeekBinlogToTimestampFilter<>(

startupOptions.startupTimestampMillis, deserializer);

break;

default:

throw new UnsupportedOperationException();

}

if (dbzProperties != null) {

props.putAll(dbzProperties);

// Add default configurations for compatibility when set the legacy mysql connector

// implementation

if (LEGACY_IMPLEMENTATION_VALUE.equals(

dbzProperties.get(LEGACY_IMPLEMENTATION_KEY))) {

props.put("transforms", "snapshotasinsert");

props.put(

"transforms.snapshotasinsert.type",

"io.debezium.connector.mysql.transforms.ReadToInsertEvent");

}

}

// 构建通用的cdc sourceFunction --> 基于richSourceFunction

return new DebeziumSourceFunction<>(

deserializer, props, specificOffset,

new MySqlValidator(props) // mysql校验器,版本信息,binlog是否为row等

);

}

}

上面内容主要是以构建source所需要的参数为主,具体我们进入到DebeziumSourceFunction中看看具体实现

代码语言:javascript复制// source代码,用于读取binlog,logminer等

// 实现richSourceFuntion完成source端代码的编写,实现ChecckpointFunction用于保证容错相关的内容,实现checkpointListener监听checkpoint的完成状态

public class DebeziumSourceFunction extends RichSourceFunction

implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable {

// ------------------------------列出一些比较重要的成员变量,不重要的忽略了------------------------------------------

// ----------------------------------State-------------------------------------------------

/* 主要用于状态的维护,当任务出现问题重启/手动重启后,维护的一些schema(record中的结构) 未消费的records(在queue中,后面会看到) offset等信息 */

private transient volatile String restoredOffsetState;

private transient ListState offsetState;

private transient ListState schemaRecordsState;

// -----------------------------------Worker-----------------------------------------------

/* 一个单线程的线程池,一个debeziumEngine(一个runnable的实现类)用与读取binlog数据

TODO 所以设计到多线程的交互*/

private transient ExecutorService executor;

private transient DebeziumEngine engine;

/* 一个consumer,用于从engine中读取数据的消费者,并将数据放入handover中 */

private transient DebeziumChangeConsumer changeConsumer;

/* 用于从handover中拿取数据 */

private transient DebeziumChangeFetcher debeziumChangeFetcher;

/* 两个线程(source,engine)之间交互数据的一个桥梁 */

private transient Handover handover;

// ----------------------------------------我们主要介绍srouce的run方法,其他方法主要用于容错相关--------------------------------------

@Override

public void run(SourceContext sourceContext) throws Exception {

// TODO 用于engine执行的一些相关参数,不是终点内容,如果感兴趣可官网看看说明

properties.setProperty("name", "engine");

properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName());

if (restoredOffsetState != null) {

properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState);

}

properties.setProperty("include.schema.changes", "false");

properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE));

properties.setProperty("tombstones.on.delete", "false");

if (engineInstanceName == null) {

engineInstanceName = UUID.randomUUID().toString();

}

properties.setProperty(

FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);

properties.setProperty("database.history", determineDatabase().getCanonicalName());

String dbzHeartbeatPrefix =

properties.getProperty(

Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),

Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString());

this.debeziumChangeFetcher =

new DebeziumChangeFetcher<>(

sourceContext,

deserializer,

restoredOffsetState == null, // 是否是快照阶段或者state==null?

dbzHeartbeatPrefix,

handover);

// 创建并配置engine相关参数

this.engine =

DebeziumEngine.create(Connect.class)

.using(properties)// 参数

.notifying(changeConsumer) // 配饰consumer消费 engine读取的数据(binlog/历史数据)

.using(OffsetCommitPolicy.always()) // offset的提交策略

.using(

(success, message, error) -> {

if (success) {

handover.close();

} else {

handover.reportError(error);

}

})

.build();

// 将engine任务提交到线程池中执行

executor.execute(engine);

debeziumStarted = true;

// mertic相关配置i

MetricGroup metricGroup = getRuntimeContext().getMetricGroup();

// ....

// 启动fetcher,循环去hanover中拿取最新数据发送下游

debeziumChangeFetcher.runFetchLoop();

}

}上面我们已经看了source.run的基本实现,他的主要处理逻辑在DebeziumChangeConsumer,DebeziumChangeFetcher,Handover中

简单介绍三个类的作用和 主要 方法和参数

DebeziumChangeConsumer : 用于消费engine读取的数据

代码语言:javascript复制/* 该类实现 DebeziumEngine.ChangeConsumer接口,实现handlerBatch方法 相对比较简单, 另外两个成员方法主要是offset相关,非重点内容*/

// engine线程会调用handleBatch方法出传递引擎消费到的数据

public class DebeziumChangeConsumer

implements DebeziumEngine.ChangeConsumer> {

@Override

public void handleBatch(

List> events,

RecordCommitter> recordCommitter) {

try {

currentCommitter = recordCommitter;

// 间接调用到handover的produce方法,该方法是阻塞的 嘻嘻嘻(如果有历史records未被消费则wait)

handover.produce(events);

} catch (Throwable e) {

// Hold this exception in handover and trigger the fetcher to exit

handover.reportError(e);

}

}

} DebeziumChangeFetcher : 循环从handover中获取consumer从engine读取的最新数据

代码语言:javascript复制public class DebeziumChangeFetcher {

private final SourceFunction.SourceContext sourceContext;

/* 保证数据发送和状态更新的一把锁 */

private final Object checkpointLock;

/* 用于将数据转化成我们自定义的类型,如json,string等 */

private final DebeziumDeserializationSchema deserialization;

/* 下面自定义的collector*/

private final DebeziumCollector debeziumCollector;

/* 见名知意,很好理解 */

private final DebeziumOffset debeziumOffset;

/* 用于存储在stateoffset的序列化器*/

private final DebeziumOffsetSerializer stateSerializer;

/* 心跳相关*/

private final String heartbeatTopicPrefix;

/* 是否恢复的状态,需要消费历史相关数据*/

private boolean isInDbSnapshotPhase;

private final Handover handover;

public void runFetchLoop() throws Exception {

try {

// 读取mysql历史的数据,不要被名字所迷惑

if (isInDbSnapshotPhase) {

List> events = handover.pollNext();

synchronized (checkpointLock) {

LOG.info(

"Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.");

handleBatch(events);

// 这里防止snapshot数据无法一次读取完毕,必须保证snapshot数据读取完毕才进入binlog的读取

while (isRunning && isInDbSnapshotPhase) {

handleBatch(handover.pollNext());

}

}

LOG.info("Received record from streaming binlog phase, released checkpoint lock.");

}

// 到这里表示snapshot的数据读取完毕,开始实时读取binlog数据

while (isRunning) {

// 具体的处理数据逻辑 pollNext会阻塞

handleBatch(handover.pollNext());

}

} catch (Handover.ClosedException e) {

// ignore

}

private void handleBatch(List> changeEvents)

throws Exception {

if (CollectionUtils.isEmpty(changeEvents)) {

return;

}

this.processTime = System.currentTimeMillis();

for (ChangeEvent event : changeEvents) {

SourceRecord record = event.value();

// time相关基本都是metric相关内容,不必较真

updateMessageTimestamp(record);

fetchDelay = processTime - messageTimestamp;

// 通过心跳机制来更新offset

if (isHeartbeatEvent(record)) {

synchronized (checkpointLock) {

debeziumOffset.setSourcePartition(record.sourcePartition());

debeziumOffset.setSourceOffset(record.sourceOffset());

}

continue;

}

// 根据不同的deserialization对数据做转换,---> 可以看这个,比较容易理解StringDebeziumDeserializationSchema, 内部直接 record.toString即可,就是将debezium读取的record转换成我们想要的格式或者类型,debeziumCollector 就是下面自定义的collector,在deserialize中,会将转换完成的数据放入queue中

deserialization.deserialize(record, debeziumCollector);

// 判断数据是否为snapshot的最后一条数据,如果是则在这条数据之后转换到binlog的streaming流程

if (!isSnapshotRecord(record)) {

LOG.debug("Snapshot phase finishes.");

isInDbSnapshotPhase = false;// runFetchLoop方法中使用

}

// 具体发送数据

emitRecordsUnderCheckpointLock(

debeziumCollector.records, record.sourcePartition(), record.sourceOffset());

}

}

private void emitRecordsUnderCheckpointLock(

Queue records, Map sourcePartition, Map sourceOffset) {

// 同步是保证数据的发送和offset的更新是安全,lock是可重入的(不懂可以百度,java基础内容)

synchronized (checkpointLock) {

T record;

// 循环debeziumCollector的records队列,将队列中的数据依次发送到下游,

while ((record = records.poll()) != null) {

emitDelay = System.currentTimeMillis() - messageTimestamp;

// 通过source的context对象将其发送到下游operator,这里转入了flink的处理逻辑,不再cdc代码之内

sourceContext.collect(record);

}

debeziumOffset.setSourcePartition(sourcePartition);

debeziumOffset.setSourceOffset(sourceOffset);

}

}

// 心跳机制 ,用于更新offset的机制

private boolean isHeartbeatEvent(SourceRecord record) {

String topic = record.topic();

return topic != null && topic.startsWith(heartbeatTopicPrefix);

}

// --------------------------------自定义collector-------------------------------------------------------

private class DebeziumCollector implements Collector {

private final Queue records = new ArrayDeque<>();

@Override

public void collect(T record) {

// 将数据放入队列,queue会在别的地方进出列将数据发送下游

records.add(record);

}

}

}Handover : source线程和engine线程执行中数据交互桥梁

代码语言:javascript复制/* 这个类由两个线程访问, pollNext由debeziumFetcher调用,produce有debeziumConsumer调用,因为涉及多线程的调用,单纯的讲代码可能不容易理解,可以去复习一下java多线程知识内容,或者自己debug一下看看调用流程就比较容易理解了 */

@ThreadSafe //表示类是线程安全的,这类涉及engine和source线程两个线程操作,内部的实现保证了线程安全

public class Handover implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(Handover.class);

private final Object lock = new Object();

@GuardedBy("lock") // 注解表示该变量受lock的保护, 不是重点勿关注

private List> next;

@GuardedBy("lock")

private Throwable error;

private boolean wakeupProducer;

/* debeziumFetcher 调用,当没有数据的时候进入wait状态,wait状态的时候cpu是不会调用wait状态的线程,另一个线程就可以占用cpu的全部时间片*/

public List> pollNext() throws Exception {

// 同步代码块才可以使用wait和notifyAll,为什么使用这种方式,因为只有两个线程,所以这种方式实现简单,如果线程多可以通过juc的lock去做或者其他方式也可以

synchronized (lock) {

// 没有数据没有异常则持续循环进入wait状态,为了防止虚假唤醒的情况

while (next == null && error == null) {

lock.wait();

}

List> n = next;

// 上面的循环可以退出的时候,说明一定是有数据或者有异常,不存在其他的情况

if (n != null) {

// 将next置为null 下面会根据此条件作为判断条件

next = null;

// 唤醒其他等待线程,当然只可能是engine线程

lock.notifyAll();

return n;

} else {

// 将异常抛出

ExceptionUtils.rethrowException(error, error.getMessage());

// 上面方法一定会抛出异常,改代码只是为了去掉编译警告...

return Collections.emptyList();

}

}

}

public void produce(final List> element)

throws InterruptedException {

checkNotNull(element);

synchronized (lock) {

// next不等一直进入wait状态

while (next != null && !wakeupProducer) {

lock.wait();

}

wakeupProducer = false;

// 有异常抛出异常,没异常将接受新数据,并唤醒fetcher线程

if (error != null) {

ExceptionUtils.rethrow(error, error.getMessage());

} else {

next = element;

lock.notifyAll();

}

}

}

}

上面代码即是基于RichSourceFunction实现的cdc主要代码,其实不算难,但是前人写的代码是已经把很多问题已经考虑进入,对代码的抽象也很好,扩展起来很方便,api设计对与我们开发者来说很容易使用

三.mysql-cdc源码-新Source接口的实现1.11版本之后flink提供了新的source接口,可以提前预习一波https://issues.apache.org/jira/browse/FLINK-10740

简单介绍一下

SourceReader : 对split的的数据进行读取操作,比如: 读取一个分区,一个块等,当然不只局限与一个分区,根据自己的实现来

SplitEnumerator : 负责对数据源进行切分或者发现分区等,比如: 发现kafka的分区,对文件划分块等

上述的比较简单,实际上比这复杂一点,所以在新的source接口实现一个source是比较难的事情,不过熟悉之后都一样

提前说明 :

一个split我们可以认为是一个切片,mysql-cdc中, 假想情况下 : 一张的一部分中, 比如 开始主键 1 到 结束主键 10 ,那么该split就表示这些数据,在具体读取数据的时候是有readTask来去读,那么他就会通过split标记的点位来进行数据的读取,当然一个readTask不止会执行一个split;

snapshot表示的是读取数据库的历史全量数据

binlog 表示当我们snapshot阶段结束后开始binlog阶段,即我们开始读取的binlog数据了

先执行snapshot阶段,后执行binlog阶段

代码的生成和旧版是相同的,只不过是内部执行的逻辑存在变化,新的source接口实现的cdc代码比较复杂,涉及的内容比较多,可能比较晕,后面自己可以根据源码debug走一走

由于代码过多,主要讲解重点的内容,不重要的跳过了

代码语言:javascript复制// 实现了两个接口 source,和 resultTypeQueryable(比较简单就一个获取结果类型信息的接口) , 主要代码还是在source接口的实现

// T 为输出类型,MySqlSplit是mysql的分割器,PendingSplitsState表示Enumerator的状态对象

public class MySqlSource

implements Source, ResultTypeQueryable {

private final MySqlSourceConfigFactory configFactory;

private final DebeziumDeserializationSchema deserializationSchema;

/* 通过构造者模式构建source所需要的参数,简单说明一下,里面的参数,通过MySqlSourceConfigFactory添加参数,在build方法中,将factory作为参数构建出MySqlSource

-------------------------------------讲解一下对应关系------------------------------------------------

MySqlSourceConfigFactory 可以根据不同的subtask创建对应的MySqlSourceConfig

MySqlSourceConfig 可以构建 MySqlConnectorConfig

MySqlConnection 通过 DebeziumUtil.createMySqlConnection(mySqlSourceConfig.getDbzConfiguration())方法构建

上面的一个config比较混乱,名字也比较不容易理解,后面用到的时候会简单提一下,这里主要是有一个印象,不要被一些配置搞混

*/

public static MySqlSourceBuilder builder() {

return new MySqlSourceBuilder<>();

}

// 由MySqlSourceBuilder.build方法创建

MySqlSource(

MySqlSourceConfigFactory configFactory,

DebeziumDeserializationSchema deserializationSchema // 与老版source的deserialization一样

) {

this.configFactory = configFactory;

this.deserializationSchema = deserializationSchema;

}

@Override // 流批一体的source,表示有界性,新source接口的特性

public Boundedness getBoundedness() {return Boundedness.CONTINUOUS_UNBOUNDED; }

/*构建sourceReader */

@Override

public SourceReader createReader(SourceReaderContext readerContext)

throws Exception {

// 前面提到了,根据subtask索引创建对应的config

MySqlSourceConfig sourceConfig =

configFactory.createConfig(readerContext.getIndexOfSubtask());

// 一个阻塞队列,多线程交互用的,不必深入

FutureCompletingBlockingQueue> elementsQueue =

new FutureCompletingBlockingQueue<>();

// metric相关

final MySqlSourceReaderMetrics sourceReaderMetrics =

new MySqlSourceReaderMetrics(readerContext.metricGroup());

sourceReaderMetrics.registerMetrics();

// 通过supplier函数构建一个SplitReader,解耦的作用,主要看里面的MySqlSplitReader实现即可

Supplier splitReaderSupplier =

// 拿到每个reader的config和对应的subtask index

() -> new MySqlSplitReader(sourceConfig, readerContext.getIndexOfSubtask());

// 构建了一个具体的sourceReader

return new MySqlSourceReader<>(

elementsQueue,

splitReaderSupplier,

new MySqlRecordEmitter<>(

deserializationSchema,

sourceReaderMetrics,

sourceConfig.isIncludeSchemaChanges()),

readerContext.getConfiguration(),

readerContext,

sourceConfig);

}

@Override

public SplitEnumerator createEnumerator(

SplitEnumeratorContext enumContext) {

// 因为只会生成一次所以生成一个sourceConfig即可

MySqlSourceConfig sourceConfig = configFactory.createConfig(0);

// 检验mysql

final MySqlValidator validator = new MySqlValidator(sourceConfig);

validator.validate();

final MySqlSplitAssigner splitAssigner;

// 判断开始条件如果是initial则先读取mysql table的数据(代码中叫做snapshot),然后再继续读取binlog的数据,如果不是initial状态,则直接从binlog开始读取

if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) {

try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {

final List remainingTables = discoverCapturedTables(jdbc, sourceConfig);

boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);

splitAssigner =

// 里面包含 snapshot和binlog的split逻辑

new MySqlHybridSplitAssigner(

sourceConfig,

enumContext.currentParallelism(),

remainingTables,

isTableIdCaseSensitive);

} catch (Exception e) {

throw new FlinkRuntimeException(

"Failed to discover captured tables for enumerator", e);

}

} else {

// 之有binlog的split逻辑

splitAssigner = new MySqlBinlogSplitAssigner(sourceConfig);

}

// 创建对应发的SplitEnumerator,用于构建split给reader读取

return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);

}

// 恢复SplitEnumerato,比如任务故障重启,会根据不同的checkpoint恢复SplitEnumerator,用于继续之前未完成的读取操作

@Override

public SplitEnumerator restoreEnumerator(

SplitEnumeratorContext enumContext, PendingSplitsState checkpoint) {

MySqlSourceConfig sourceConfig = configFactory.createConfig(0);

final MySqlSplitAssigner splitAssigner;

if (checkpoint instanceof HybridPendingSplitsState) {

splitAssigner =

new MySqlHybridSplitAssigner(

sourceConfig,

enumContext.currentParallelism(),

(HybridPendingSplitsState) checkpoint);

} else if (checkpoint instanceof BinlogPendingSplitsState) {

splitAssigner =

new MySqlBinlogSplitAssigner( sourceConfig, (BinlogPendingSplitsState) checkpoint);

} else {

throw new UnsupportedOperationException( "Unsupported restored PendingSplitsState: " + checkpoint);

}

return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);

}

// -----------------容错相关,不是重点-----------------

@Override

public SimpleVersionedSerializer getSplitSerializer() { return MySqlSplitSerializer.INSTANCE; }

@Override

public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { return new PendingSplitsStateSerializer(getSplitSerializer());}

// 返回值类型的提取

@Override

public TypeInformation getProducedType() {return deserializationSchema.getProducedType();}

}

上面的代码中我们可以看到source的实现,主要是构建sourceReader和splitEnumerator,以及容错内容,相关的处理逻辑也封装在相应的对象中,下面我们对其内部逐步剖析

代码语言:javascript复制/*在看其他内容之前,我们可以看看如何对mysql进行split操作,在snapshot是通过主键来split的,binlog的只从当前offset位置开始消费,

这里是混合的一个split,另外还存在binlog和snapshot的splitAssigner,不过我们根据主要看看大致逻辑,具体到某一直可以自己阅读理解,

解释一下 : 先读取mysql历史数据即snapshot阶段,然后再进行当前mysql-binlog的位置开始消费,所以这个混合的意义就是先读取全量数据,然后从最新的binlog开始读取,完成cdc读取数据的过程*/

public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {

private final int splitMetaGroupSize;

private boolean isBinlogSplitAssigned;

private final MySqlSnapshotSplitAssigner snapshotSplitAssigner;

public MySqlHybridSplitAssigner(

MySqlSourceConfig sourceConfig,

int currentParallelism,

List remainingTables,

boolean isTableIdCaseSensitive) {

this(

// 创建snapshot split

new MySqlSnapshotSplitAssigner(

sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive),

false,

sourceConfig.getSplitMetaGroupSize());

}

public MySqlHybridSplitAssigner(

MySqlSourceConfig sourceConfig,

int currentParallelism,

HybridPendingSplitsState checkpoint) {

this(

new MySqlSnapshotSplitAssigner(

sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()),

checkpoint.isBinlogSplitAssigned(),

sourceConfig.getSplitMetaGroupSize());

}

private MySqlHybridSplitAssigner(

MySqlSnapshotSplitAssigner snapshotSplitAssigner,

boolean isBinlogSplitAssigned,

int splitMetaGroupSize) {

this.snapshotSplitAssigner = snapshotSplitAssigner;

this.isBinlogSplitAssigned = isBinlogSplitAssigned;

this.splitMetaGroupSize = splitMetaGroupSize;

}

@Override

public void open() {

snapshotSplitAssigner.open();

}

// 主要返回下一个split,没有则返回一个空, optional可以jdk8的新特性,用于解决空指针的一个类

@Override

public Optional getNext() {

// 下面的方法可以见名知意,自行理解即可

if (snapshotSplitAssigner.noMoreSplits()) {

if (isBinlogSplitAssigned) {

return Optional.empty();

} else if (snapshotSplitAssigner.isFinished()) { // 当snapshot完成后,开始binlog的split流程

// we need to wait snapshot-assigner to be finished before

// assigning the binlog split. Otherwise, records emitted from binlog split

// might be out-of-order in terms of same primary key with snapshot splits.

isBinlogSplitAssigned = true;

return Optional.of(createBinlogSplit());

} else {

// binlog split is not ready by now

return Optional.empty();

}

} else {

// snapshot assigner still have remaining splits, assign split from it

return snapshotSplitAssigner.getNext();

}

}

// splitAssigner是否在等待已完成split回调,即onFinishedSplits

@Override

public boolean waitingForFinishedSplits() {

return snapshotSplitAssigner.waitingForFinishedSplits();

}

// 获取已完成的split并且包含他的元数据,可以根据已经完成snapshot(snapshot的某一个split)生成对应binlog的split

@Override

public List getFinishedSplitInfos() {

return snapshotSplitAssigner.getFinishedSplitInfos();

}

// 使用已完成的binlog偏移量来处理已完成的split,用于确定何时生成binlog split以及生成什么binlog split,就是回调

@Override

public void onFinishedSplits(Map splitFinishedOffsets) {

snapshotSplitAssigner.onFinishedSplits(splitFinishedOffsets);

}

// 向此splitAssigner添加一组spilt,当某些split处理失败,则需要重新添加分割时调用此方法

@Override

public void addSplits(Collection splits) {

List snapshotSplits = new ArrayList<>();

for (MySqlSplit split : splits) {

if (split.isSnapshotSplit()) {

snapshotSplits.add(split);

} else {

// we don't store the split, but will re-create binlog split later

isBinlogSplitAssigned = false;

}

}

snapshotSplitAssigner.addSplits(snapshotSplits);

}

// ----------------------------checkpoint 容错相关----------------------------------------

@Override

public PendingSplitsState snapshotState(long checkpointId) {

return new HybridPendingSplitsState(

snapshotSplitAssigner.snapshotState(checkpointId), isBinlogSplitAssigned);

}

@Override

public void notifyCheckpointComplete(long checkpointId) {

snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);

}

@Override

public void close() {

snapshotSplitAssigner.close();

}

// ------------------------------------binlog split部分-------------------------------------------

// 构建biglog split, 就是根据已经完成snapshot split来构建binlog split的一个过程,split代码比较简单可以自行阅读

// 简单介绍一下 就是描述binlog的split,snapshot的split相关内容,比如snapshot,会按照主键去做split,已经table的schemas相关信息

private MySqlBinlogSplit createBinlogSplit() {

final List assignedSnapshotSplit =

snapshotSplitAssigner.getAssignedSplits().values().stream()

.sorted(Comparator.comparing(MySqlSplit::splitId))

.collect(Collectors.toList());

Map splitFinishedOffsets =

snapshotSplitAssigner.getSplitFinishedOffsets();

final List finishedSnapshotSplitInfos = new ArrayList<>();

BinlogOffset minBinlogOffset = null;

for (MySqlSnapshotSplit split : assignedSnapshotSplit) {

// find the min binlog offset

BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());

if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {

minBinlogOffset = binlogOffset;

}

finishedSnapshotSplitInfos.add(

new FinishedSnapshotSplitInfo(

split.getTableId(),

split.splitId(),

split.getSplitStart(),

split.getSplitEnd(),

binlogOffset));

}

boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;

return new MySqlBinlogSplit(

BINLOG_SPLIT_ID,

minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,

BinlogOffset.NO_STOPPING_OFFSET,

divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,

new HashMap<>(),

finishedSnapshotSplitInfos.size());

}

}

现在我们开始介绍sourceReader和SplitEnumerator

sourceReader :

代码语言:javascript复制

/* SingleThreadMultiplexSourceReaderBase */

public class MySqlSourceReader

extends SingleThreadMultiplexSourceReaderBase {

private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class);

private final MySqlSourceConfig sourceConfig;

private final Map finishedUnackedSplits;

private final Map uncompletedBinlogSplits;

private final int subtaskId;

public MySqlSourceReader(

FutureCompletingBlockingQueue> elementQueue,

Supplier splitReaderSupplier,

RecordEmitter recordEmitter,

Configuration config,

SourceReaderContext context,

MySqlSourceConfig sourceConfig) {

super(

elementQueue,

// 一个单线程的fetcher管理器,做一些读取操作

// 简单描述一下流程

// SingleThreadFetcherManager.createSplitFetcher 构建一个SplitFetcher(实现了Runnable),在SplitFetcher中会构建一个fetcherTask,SplitFetcher.run方法中,循环调用this.runOnce(),this.runOnce()会持续调用fetcherTask.run()读取数据,run()会调用MySqlSplitReader.fetch方法,返回reader读取的数据,并将数据放入到elementQueue中,只要涉及都多线程的代码,都比较晦涩难懂

new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier::get),

recordEmitter,

config,

context);

this.sourceConfig = sourceConfig;

this.finishedUnackedSplits = new HashMap<>();

this.uncompletedBinlogSplits = new HashMap<>();

this.subtaskId = context.getIndexOfSubtask();

}

// 启动reader

@Override

public void start() {

if (getNumberOfCurrentlyAssignedSplits() == 0) {

// 发送split的请求到splitEnumerator,会调用到SplitEnumerator.handleSplitRequest(int, String)方法,会带这并行的reader的subtask id 和hostname

context.sendSplitRequest();

}

}

// 当reader分配到新的split的时候,会初始化一个split的state

@Override

protected MySqlSplitState initializedState(MySqlSplit split) {

if (split.isSnapshotSplit()) {

return new MySqlSnapshotSplitState(split.asSnapshotSplit());

} else {

return new MySqlBinlogSplitState(split.asBinlogSplit());

}

}

@Override // 容错相关, skip

public List snapshotState(long checkpointId) {

// unfinished splits

List stateSplits = super.snapshotState(checkpointId);

// add finished snapshot splits that didn't receive ack yet

stateSplits.addAll(finishedUnackedSplits.values());

// add binlog splits who are uncompleted

stateSplits.addAll(uncompletedBinlogSplits.values());

return stateSplits;

}

// 清理处理已完成的split状态,非重点

@Override

protected void onSplitFinished(Map finishedSplitIds) {

for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {

MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();

checkState(

mySqlSplit.isSnapshotSplit(),

String.format(

"Only snapshot split could finish, but the actual split is binlog split %s",

mySqlSplit));

finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());

}

reportFinishedSnapshotSplitsIfNeed();

context.sendSplitRequest();

}

/*添加此reader要read的split列表,当splitEnumerator通过splitEnumeratorContext分配一个splut时,将调用此方法

即调用 context.assignSplit(SourceSplit, int) 或者 context.assignSplits(SplitsAssignment).

*/

@Override

public void addSplits(List splits) {

List unfinishedSplits = new ArrayList<>();

for (MySqlSplit split : splits) {

// 判断是否是snapshot还是binlog split

if (split.isSnapshotSplit()) {

// 如果split已经read完成放入完成集合,否则放入未完成的集合中

MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();

if (snapshotSplit.isSnapshotReadFinished()) {

finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);

} else {

unfinishedSplits.add(split);

}

} else {

if (!split.asBinlogSplit().isCompletedSplit()) {

//如果binlog split未完成则加入未完成的列表中,并想spluitEnumerator发送请求binlog split meta的事件

uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());

requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());

} else {

// 未完成的split集合删除该split ,未完成的集合表示没有split meta信息

uncompletedBinlogSplits.remove(split.splitId());

// 创建binlog split, 带有table schema信息

MySqlBinlogSplit mySqlBinlogSplit =

discoverTableSchemasForBinlogSplit(split.asBinlogSplit());

// 添加到未完成的splits,后续会进行read操作

unfinishedSplits.add(mySqlBinlogSplit);

}

}

}

// notify split enumerator again about the finished unacked snapshot splits

reportFinishedSnapshotSplitsIfNeed();

// add all un-finished splits (including binlog split) to SourceReaderBase

// TODO 当调用spuer.addSplits的时候,会启动fetcherManager,开始读取数据的操作

super.addSplits(unfinishedSplits);

}

private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {

final String splitId = split.splitId();

// 如果tableSchema不存在则填充,如果已经存在,则直接返回split即可

if (split.getTableSchemas().isEmpty()) {

try (MySqlConnection jdbc =

// 静态方法,构建一个mysqlConnection,可以认为就是一个jdbc连接 ,不必深入

DebeziumUtils.createMySqlConnection(sourceConfig.getDbzConfiguration())) {

Map tableSchemas =

// 静态方法,根据我们sourceBuilder构建的时候给定的database和tablelist来构建对应的tableId和TableChange,然后我们在面read的时候需要, 不必深入工具类

TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);

LOG.info("The table schema discovery for binlog split {} success", splitId);

// 静态方法,构建一个带有tableSchema的MysqlBinlogSpilt,不必深入

return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);

} catch (SQLException e) {

LOG.error("Failed to obtains table schemas due to {}", e.getMessage());

throw new FlinkRuntimeException(e);

}

} else {

LOG.warn("The binlog split {} has table schemas yet, skip the table schema discovery",split);

return split;

}

}

// 处理source自定义事件,接收来自splitEumumerator,与splitEumumerator类似

@Override

public void handleSourceEvents(SourceEvent sourceEvent) {

if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {

FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;

LOG.debug(

"The subtask {} receives ack event for {} from enumerator.",

subtaskId,

ackEvent.getFinishedSplits());

for (String splitId : ackEvent.getFinishedSplits()) {

this.finishedUnackedSplits.remove(splitId);

}

} else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {

// report finished snapshot splits

LOG.debug(

"The subtask {} receives request to report finished snapshot splits.",

subtaskId);

reportFinishedSnapshotSplitsIfNeed();

} else if (sourceEvent instanceof BinlogSplitMetaEvent) {

LOG.debug(

"The subtask {} receives binlog meta with group id {}.",

subtaskId,

((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());

fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);

} else {

super.handleSourceEvents(sourceEvent);

}

}

// 发送请求binlogSplit meta的事件

private void requestBinlogSplitMetaIfNeeded(MySqlBinlogSplit binlogSplit) {

final String splitId = binlogSplit.splitId();

if (!binlogSplit.isCompletedSplit()) {

final int nextMetaGroupId =

getNextMetaGroupId(

binlogSplit.getFinishedSnapshotSplitInfos().size(),

sourceConfig.getSplitMetaGroupSize());

BinlogSplitMetaRequestEvent splitMetaRequestEvent =

new BinlogSplitMetaRequestEvent(splitId, nextMetaGroupId);

context.sendSourceEventToCoordinator(splitMetaRequestEvent);

} else {

LOG.info("The meta of binlog split {} has been collected success", splitId);

this.addSplits(Arrays.asList(binlogSplit));

}

}

// 我们发送了请求meta的event后,会收到binlog split meta,我们需要填充至binlogSplit中

private void fillMetaDataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) {

MySqlBinlogSplit binlogSplit = uncompletedBinlogSplits.get(metadataEvent.getSplitId());

if (binlogSplit != null) {

final int receivedMetaGroupId = metadataEvent.getMetaGroupId();

final int expectedMetaGroupId =

getNextMetaGroupId(

binlogSplit.getFinishedSnapshotSplitInfos().size(),

sourceConfig.getSplitMetaGroupSize());

if (receivedMetaGroupId == expectedMetaGroupId) {

List metaDataGroup =

metadataEvent.getMetaGroup().stream()

.map(FinishedSnapshotSplitInfo::deserialize)

.collect(Collectors.toList());

uncompletedBinlogSplits.put(

binlogSplit.splitId(),

MySqlBinlogSplit.appendFinishedSplitInfos(binlogSplit, metaDataGroup));

LOG.info("Fill meta data of group {} to binlog split", metaDataGroup.size());

} else {

LOG.warn("Received out of oder binlog meta event for split {}, the received meta group id is {}, but expected is {}, ignore it",metadataEvent.getSplitId(), receivedMetaGroupId,expectedMetaGroupId);

}

// 继续发送请求meta event

requestBinlogSplitMetaIfNeeded(binlogSplit);

} else {

LOG.warn( "Received binlog meta event for split {}, but the uncompleted split map does not contain it", metadataEvent.getSplitId());

}

}

// state变成不可变的state

@Override

protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) { return splitState.toMySqlSplit(); }

}

splitEnumerator :

处理sourceReader的split请求将split分配给sourceReader代码语言:javascript复制// 继承SplitEnumerator,并重写其方法

public class MySqlSourceEnumerator implements SplitEnumerator {

private static final long CHECK_EVENT_INTERVAL = 30_000L;

private final SplitEnumeratorContext context;

private final MySqlSourceConfig sourceConfig;

private final MySqlSplitAssigner splitAssigner;

// using TreeSet to prefer assigning binlog split to task-0 for easier debug

private final TreeSet readersAwaitingSplit;

private List> binlogSplitMeta;

public MySqlSourceEnumerator(

SplitEnumeratorContext context,

MySqlSourceConfig sourceConfig,

MySqlSplitAssigner splitAssigner) {

// source.createEnumerator传入的context对象

this.context = context;

this.sourceConfig = sourceConfig;

this.splitAssigner = splitAssigner;

this.readersAwaitingSplit = new TreeSet<>();

}

@Override

public void start() {

splitAssigner.open(); //调用splitAssigner的open方法,可以具体看看每个splitAssigner的实现

// 注册一个Callable,定期调用,主要的作用就是当reader出现通信失败或者故障重启之后,检查是否有错过的通知时间,不是终点

this.context.callAsync(

this::getRegisteredReader,

this::syncWithReaders,

CHECK_EVENT_INTERVAL,

CHECK_EVENT_INTERVAL);

}

// 处理split的请求,当有具体给定子subtask id的reader调用SourceReaderContext.sendSplitRequest()方法时,将调用此方法。

@Override

public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {

if (!context.registeredReaders().containsKey(subtaskId)) {

// reader failed between sending the request and now. skip this request.

return;

}

// 将请求的taskId放入等待列表

readersAwaitingSplit.add(subtaskId);

// 对等待列表的subtask进行fen'pei

assignSplits();

}

// 将split添加至splitEnumerator,只有在最后一个成功的checkpoint之后,分配的spilt才会出现此情况,说明需要重新处理.

@Override

public void addSplitsBack(List splits, int subtaskId) {

LOG.debug("MySQL Source Enumerator adds splits back: {}", splits);

splitAssigner.addSplits(splits);

}

// 处理sourceReader的自定义event

@Override

public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {

// sourceReader发送给splitEnumerator的SourceEvent通知snapshot的split已经读取完成,binlog的位置是一致的

if (sourceEvent instanceof FinishedSnapshotSplitsReportEvent) {

LOG.info(

"The enumerator receives finished split offsets {} from subtask {}.",

sourceEvent,

subtaskId);

FinishedSnapshotSplitsReportEvent reportEvent =

(FinishedSnapshotSplitsReportEvent) sourceEvent;

Map finishedOffsets = reportEvent.getFinishedOffsets();

// 上面splitAssigner介绍过

splitAssigner.onFinishedSplits(finishedOffsets);

// 返回ACK事件返回给redaer的表示已经确认了snapshot

FinishedSnapshotSplitsAckEvent ackEvent =

new FinishedSnapshotSplitsAckEvent(new ArrayList<>(finishedOffsets.keySet()));

context.sendEventToSourceReader(subtaskId, ackEvent);

}

// sourceReader发送给splitEnumerator的SourceEvent用来拉取binlog元数据,也就是发送BinlogSplitMetaEvent

else if (sourceEvent instanceof BinlogSplitMetaRequestEvent) {

LOG.debug(

"The enumerator receives request for binlog split meta from subtask {}.",

subtaskId);

// 发送binlog meta

sendBinlogMeta(subtaskId, (BinlogSplitMetaRequestEvent) sourceEvent);

}

}

@Override

public PendingSplitsState snapshotState(long checkpointId) {

return splitAssigner.snapshotState(checkpointId);

}

@Override

public void notifyCheckpointComplete(long checkpointId) {

splitAssigner.notifyCheckpointComplete(checkpointId);

// binlog split may be available after checkpoint complete

assignSplits();

}

// ------------------------------------------------------------------------------------------

// 为等待列表的subtask分配

private void assignSplits() {

// treeSet返回的iter是排好序的,即按照subtask id顺序依次处理

final Iterator awaitingReader = readersAwaitingSplit.iterator();

while (awaitingReader.hasNext()) {

int nextAwaiting = awaitingReader.next();

// 如果reader再次请求的split在此期间失败,则将其从等待列表中删除

if (!context.registeredReaders().containsKey(nextAwaiting)) {

awaitingReader.remove();

continue;

}

Optional split = splitAssigner.getNext();

if (split.isPresent()) {

final MySqlSplit mySqlSplit = split.get();

// 为subtask分配split

context.assignSplit(mySqlSplit, nextAwaiting);

awaitingReader.remove();

LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting);

} else {

// there is no available splits by now, skip assigning

// 前面splitAssigner中会分配空值,在这里被过滤掉

break;

}

}

}

// 发送给binlog meta event到reader

private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent requestEvent) {

// 如果binlog meta ==null 则进行meta的初始化操作

if (binlogSplitMeta == null) {

final List finishedSnapshotSplitInfos =

splitAssigner.getFinishedSplitInfos();

if (finishedSnapshotSplitInfos.isEmpty()) {

LOG.error(

"The assigner offer empty finished split information, this should not happen");

throw new FlinkRuntimeException(

"The assigner offer empty finished split information, this should not happen");

}

binlogSplitMeta =

Lists.partition(

finishedSnapshotSplitInfos, sourceConfig.getSplitMetaGroupSize());

}

final int requestMetaGroupId = requestEvent.getRequestMetaGroupId();

if (binlogSplitMeta.size() > requestMetaGroupId) {

// 获取对应的FinishedSnapshotSplitInfo列表,并将其封序列化,生成meta event

List metaToSend = binlogSplitMeta.get(requestMetaGroupId);

BinlogSplitMetaEvent metadataEvent =

new BinlogSplitMetaEvent(

requestEvent.getSplitId(),

requestMetaGroupId,

metaToSend.stream()

.map(FinishedSnapshotSplitInfo::serialize)

.collect(Collectors.toList()));

// 将生成的meta evnet 发送给reader

context.sendEventToSourceReader(subTask, metadataEvent);

} else {

LOG.error(

"Received invalid request meta group id {}, the invalid meta group id range is [0, {}]",

requestMetaGroupId,

binlogSplitMeta.size() - 1);

}

}

}

上面两个类中我们没有看到具体的读数据逻辑,实际上当系统调用addSplit()的时候就开始启动任务了,由于调用链比较长,为了方便观看,我这里直接截图看代码,看看代码是怎么开始进入执行逻辑的,

sourceReader中创建的fetcherManager,存入父类成员变量中当sourceReader调用addSplits的会调用父类的addSplits方法调用我们传入的fetcherManager的addSplits方法调用fetcherManager的addSplits方法时,子类没有覆写父类方法,直接进入父类方法,这里直接进入父类的splits方法,如果fetcher没有启动,则创建fetcher(一个runnable对象),然后提交到线程池执行任务上面可以看到我们的fetcher已经启动了,那我们就看看fetcher具体做了什么样子的事情(要记住上面传入了一个队列,fetcher中读取的数据会放入队列中),createFetcher时候,实际是创建的SplitFetcher,有flink新source中提供类

代码语言:javascript复制/*由于SplitFetcher是一个runnable对象,所以我们直接进入run方法看看做了什么即可

先介绍一下流程 :

1. 当构建fetcher的时候在构造方法中,我们传递了一个splitReader,这个是负责真实读取数据的(实际上是mysqlSplitReader)

2. fetcher构造方法中构建了一个FetcherTask,run之后会开始task的执行,如果还记得的话 我们在startFetcher()之后调用了一个fetcher的addSplit方法,该方法会将splits构建成tasks加入的taskQueue

3. 里面会有一些空闲,唤醒等不重要的逻辑,我给删除掉了,不重要,不要占用过多时间,因为非cdc内容

*/

private static final Logger LOG = LoggerFactory.getLogger(SplitFetcher.class);

private static final SplitFetcherTask WAKEUP_TASK = new DummySplitFetcherTask("WAKEUP_TASK");

private final int id;

private final BlockingDeque taskQueue;

// track the assigned splits so we can suspend the reader when there is no splits assigned.

private final Map assignedSplits;

private final FutureCompletingBlockingQueue> elementsQueue;

private final SplitReader splitReader;

private final Consumer errorHandler;

private final Runnable shutdownHook;

private final AtomicBoolean wakeUp;

private final AtomicBoolean closed;

private final FetchTask fetchTask;

private volatile SplitFetcherTask runningTask = null;

private final Object lock = new Object();

SplitFetcher(

int id,

FutureCompletingBlockingQueue> elementsQueue,

SplitReader splitReader,

Consumer errorHandler,

Runnable shutdownHook,

Consumer> splitFinishedHook) {

this.id = id;

// task队列,包含WAKEUP_TASK(特定情况下唤醒fetcher线程用),以及我们读取任务的task

this.taskQueue = new LinkedBlockingDeque<>();

// 读取的数据会放入该队列

this.elementsQueue = elementsQueue;

this.assignedSplits = new HashMap<>();

this.splitReader = splitReader;

this.errorHandler = errorHandler;

this.shutdownHook = shutdownHook;

this.isIdle = true;

this.wakeUp = new AtomicBoolean(false);

this.closed = new AtomicBoolean(false);

// 对传入的splitReader封装到fetcherTask,以便任务启动的时候直接执行任务

this.fetchTask =

new FetchTask<>(

splitReader,

elementsQueue,

ids -> {

ids.forEach(assignedSplits::remove);

splitFinishedHook.accept(ids);

LOG.info("Finished reading from splits {}", ids);

},

id);

}

@Override

public void run() {

LOG.info("Starting split fetcher {}", id);

try {

while (!closed.get()) {

// 每次循环的距离逻辑

runOnce();

}

} catch (Throwable t) {

errorHandler.accept(t);

} finally {

try {

splitReader.close();

} catch (Exception e) {

errorHandler.accept(e);

}

LOG.info("Split fetcher {} exited.", id);

// This executes after possible errorHandler.accept(t). If these operations bear

// a happens-before relation, then we can checking side effect of errorHandler.accept(t)

// to know whether it happened after observing side effect of shutdownHook.run().

shutdownHook.run();

}

}

/** Package private method to help unit test. */

void runOnce() {

try {

if (shouldRunFetchTask()) {

runningTask = fetchTask;

} else {

runningTask = taskQueue.take();

}

LOG.debug("Prepare to run {}", runningTask);

// 这里运行task,我们下面直接去task中看看具体的操作逻辑即可

if (!wakeUp.get() && runningTask.run()) {

LOG.debug("Finished running task {}", runningTask);

// the task has finished running. Set it to null so it won't be enqueued.

runningTask = null;

checkAndSetIdle();

}

} catch (Exception e) {

throw new RuntimeException(

String.format(

"SplitFetcher thread %d received unexpected exception while polling the records",

id),

e);

}

// If the task is not null that means this task needs to be re-executed. This only

// happens when the task is the fetching task or the task was interrupted.

maybeEnqueueTask(runningTask);

synchronized (wakeUp) {

// Set the running task to null. It is necessary for the shutdown method to avoid

// unnecessarily interrupt the running task.

runningTask = null;

// Set the wakeUp flag to false.

wakeUp.set(false);

LOG.debug("Cleaned wakeup flag.");

}

}

/* 在fetcher创建的时候调用了该方法,或者已经运行之后调用的该方法在上面截图的流程中有代码 */

public void addSplits(List splitsToAdd) {

enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));

wakeUp(true);

}

public void enqueueTask(SplitFetcherTask task) {

synchronized (lock) {

taskQueue.offer(task);

isIdle = false;

}

}

}我们进入fetcherTask中,只看task逻辑即可

代码语言:javascript复制class FetcherTask{

@Override

public boolean run() throws IOException {

try {

if (!isWakenUp() && lastRecords == null) {

// 返回的是该对象 public final class MySqlRecords implements RecordsWithSplitIds

// 调用了我们在创建fetcherTask的时候传入的splitReader对象,实际上还是调用reader的fetch方法来真正的获取数据

lastRecords = splitReader.fetch();

}

if (!isWakenUp()) {

// The order matters here. We must first put the last records into the queue.

// This ensures the handling of the fetched records is atomic to wakeup.

// 将读取的数据放入到队列汇总

if (elementsQueue.put(fetcherIndex, lastRecords)) {

if (!lastRecords.finishedSplits().isEmpty()) {

// The callback does not throw InterruptedException.

splitFinishedCallback.accept(lastRecords.finishedSplits());

}

lastRecords = null;

}

}

} catch (InterruptedException e) {

throw new IOException("Source fetch execution was interrupted", e);

if (isWakenUp()) {

wakeup = false;

}

}

return true;

}

}

通过上面我们基本上已经清楚了在flink层面是怎么最终调用了cdc读取数据的代码,现在我们根据主要的读取代码看看是怎么样子实现的

currentReader.pollSplitRecords() ,我们简单介绍一下currentReader(BinlogSplitReader/SnapshotSplitReader)主要两种实现,大概的思路这里面会根据不同的性质区分进行读取数据,在submitSplit的时候会创建readRask读取指定split的数据,结果会放入StatefulTaskContext的queue中,在fetch方法会先提交split,让其执行read数据,然后通过pollSplitRecords方法在调用queue.poll拉取数据,这是一个阻塞的操作,如果超时则抛出中断异常

代码语言:javascript复制public class MySqlSplitReader implements SplitReader {

private final Queue splits;

private final MySqlSourceConfig sourceConfig;

private final int subtaskId;

@Nullable private DebeziumReader currentReader;

@Nullable private String currentSplitId;

@Override

public RecordsWithSplitIds fetch() throws IOException {

// 执行fetch的时候提前检查一下currentReader,并根据不同的split创建不同的对应的reader,binlog/snapshot

checkSplitOrStartNext();

Iterator dataIt = null;

try {

// 调用具体的debeziumReader执行任务

// 在reader中会调用StatefulTaskContext的queue的poll方法拉取数据,该方法会阻塞(也可以根据时间阻塞),如果时间间隔内没有返回数据则被中断,抛出InterruptedException

dataIt = currentReader.pollSplitRecords();

} catch (InterruptedException e) {

LOG.warn("fetch data failed.", e);

throw new IOException(e);

}

return dataIt == null

? finishedSnapshotSplit() // 如果没有读取到数据则返回一个空的,该方法执行后会将currentSplitId置位null,表示已经该split执行完成

: MySqlRecords.forRecords(currentSplitId, dataIt);

}

@Override

public void handleSplitsChanges(SplitsChange splitsChanges) {

if (!(splitsChanges instanceof SplitsAddition)) {

throw new UnsupportedOperationException(

String.format(

"The SplitChange type of %s is not supported.",

splitsChanges.getClass()));

}

LOG.debug("Handling split change {}", splitsChanges);

splits.addAll(splitsChanges.splits());

}

private void checkSplitOrStartNext() throws IOException {

// the binlog reader should keep alive

if (currentReader instanceof BinlogSplitReader) {

return;

}

if (canAssignNextSplit()) {

final MySqlSplit nextSplit = splits.poll();

if (nextSplit == null) {

throw new IOException("Cannot fetch from another split - no split remaining");

}

currentSplitId = nextSplit.splitId();

if (nextSplit.isSnapshotSplit()) {

if (currentReader == null) {

final MySqlConnection jdbcConnection =

createMySqlConnection(sourceConfig.getDbzConfiguration());

final BinaryLogClient binaryLogClient =

createBinaryClient(sourceConfig.getDbzConfiguration());

final StatefulTaskContext statefulTaskContext =

new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);

currentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);

}

} else {

// point from snapshot split to binlog split

if (currentReader != null) {

LOG.info("It's turn to read binlog split, close current snapshot reader");

currentReader.close();

}

final MySqlConnection jdbcConnection =

createMySqlConnection(sourceConfig.getDbzConfiguration());

final BinaryLogClient binaryLogClient =

createBinaryClient(sourceConfig.getDbzConfiguration());

final StatefulTaskContext statefulTaskContext =

new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);

currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId);

LOG.info("BinlogSplitReader is created.");

}

// 提交一个split到reader,reader会在在submitSplit方法创建ReadTask对象,进行读取数据,将数据放入StatefulTaskContext的queue中,readTask放入线程池执行任务

currentReader.submitSplit(nextSplit);

}

}

private boolean canAssignNextSplit() {

return currentReader == null || currentReader.isFinished();

}

}

上面基本运行流程已经走通了现在就差实际读取数据的阶段了,现在我们直接跟着代码一点一点走 看看实际的执行逻辑是什么样子的,怎么读取数据的,由于cdc的代码比较多,我们就过滤掉了,binlogReadTask太繁琐,我们就不一步一步讲解了,后面可以简单介绍一下流程看看

我们一次看看对于reader对应的处理逻辑

1.checkSplitWithSplitIds方法在这个方法中最主要的是调用了submitSplit开始我们下面的读取数据的一个流程

代码语言:javascript复制

// ------------------------- SnapshotSplitReader.submitSplit方法 ------------------------------------------

public void submitSplit(MySqlSplit mySqlSplit) {

this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();

statefulTaskContext.configure(currentSnapshotSplit);

// 拿到context的queue,在pollSplitSrecords的时候需要

this.queue = statefulTaskContext.getQueue();

this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster();

this.hasNextElement.set(true);

this.reachEnd.set(false);

// 主要读取逻辑在readTask中

this.splitSnapshotReadTask =

new MySqlSnapshotSplitReadTask(

statefulTaskContext.getConnectorConfig(),

statefulTaskContext.getOffsetContext(),

statefulTaskContext.getSnapshotChangeEventSourceMetrics(),

statefulTaskContext.getDatabaseSchema(),

statefulTaskContext.getConnection(),

statefulTaskContext.getDispatcher(),

statefulTaskContext.getTopicSelector(),

StatefulTaskContext.getClock(),

currentSnapshotSplit);

// 提交一个runnable到线程中,主要是执行readTask的execute方法

executor.submit(

() -> {

try {

currentTaskRunning = true;

// 自己实现的contextImpl 主要记录高水位和低水位用

final SnapshotSplitChangeEventSourceContextImpl sourceContext =

new SnapshotSplitChangeEventSourceContextImpl();

// 执行readTask

SnapshotResult snapshotResult =

splitSnapshotReadTask.execute(sourceContext);

final MySqlBinlogSplit backfillBinlogSplit =

createBackfillBinlogSplit(sourceContext);

// optimization that skip the binlog read when the low watermark equals high

// watermark

// 如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的read

final boolean binlogBackfillRequired =

backfillBinlogSplit

.getEndingOffset()

.isAfter(backfillBinlogSplit.getStartingOffset());

if (!binlogBackfillRequired) {

dispatchHighWatermark(backfillBinlogSplit);

currentTaskRunning = false;

return;

}

// snapshot执行完成后,开始binlogReadTask的读取操作

if (snapshotResult.isCompletedOrSkipped()) {

// 根据snapshot read task读取结束后,会记录高低水位,水位线作为参数构建binlog read task

final MySqlBinlogSplitReadTask backfillBinlogReadTask =

createBackfillBinlogReadTask(backfillBinlogSplit);

// 执行binlog read task,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了

// 我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlog read task中,会

// 以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游

backfillBinlogReadTask.execute(

new SnapshotBinlogSplitChangeEventSourceContextImpl());

} else {

readException =

new IllegalStateException(

String.format(

"Read snapshot for mysql split %s fail",

currentSnapshotSplit));

}

} catch (Exception e) {

currentTaskRunning = false;

LOG.error(

String.format(

"Execute snapshot read task for mysql split %s fail",

currentSnapshotSplit),

e);

readException = e;

}

});

}

// ------------------------- MySqlSnapshotSplitReadTask.execute(sourceContext)方法 ------------------------------------------

@Override

public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {

SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);//就是new了一个

final SnapshotContext ctx;

try {

ctx = prepare(context); //重新new了一个 context对象,比较无用

} catch (Exception e) {

LOG.error("Failed to initialize snapshot context.", e);

throw new RuntimeException(e);

}

try {

// 上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可

return doExecute(context, ctx, snapshottingTask);

} catch (InterruptedException e) {

LOG.warn("Snapshot was interrupted before completion");

throw e;

} catch (Exception t) {

throw new DebeziumException(t);

}

}

// ------------------------- MySqlSnapshotSplitReadTask.doExecute(sourceContext)方法 ------------------------------------------

@Override

protected SnapshotResult doExecute(

ChangeEventSourceContext context,

SnapshotContext snapshotContext,

SnapshottingTask snapshottingTask)

throws Exception {

final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =

(RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;

ctx.offset = offsetContext;

// 一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了

final SignalEventDispatcher signalEventDispatcher =

new SignalEventDispatcher(

offsetContext.getPartition(),

topicSelector.topicNameFor(snapshotSplit.getTableId()),

dispatcher.getQueue());

// 其实log输出的日志就已经很清晰了

// 记录低水位

final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);

LOG.info(

"Snapshot step 1 - Determining low watermark {} for split {}",

lowWatermark,

snapshotSplit);

((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))

.setLowWatermark(lowWatermark);

signalEventDispatcher.dispatchWatermarkEvent(

snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);

LOG.info("Snapshot step 2 - Snapshotting data");

// 读取数据 主要方法重点介绍的地方

createDataEvents(ctx, snapshotSplit.getTableId());

// 记录高水位

final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);

LOG.info(

"Snapshot step 3 - Determining high watermark {} for split {}",

highWatermark,

snapshotSplit);

signalEventDispatcher.dispatchWatermarkEvent(

snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);

((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))

.setHighWatermark(highWatermark);

return SnapshotResult.completed(ctx.offset);

}

// 我们看看createDataEvents 调用过程

private void createDataEvents(

RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,

TableId tableId)

throws Exception {

EventDispatcher.SnapshotReceiver snapshotReceiver =

dispatcher.getSnapshotChangeEventReceiver();

LOG.debug("Snapshotting table {}", tableId);

createDataEventsForTable(

snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));

// receiver的逻辑我们就不看了,我这里介绍一下就好

// receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!=null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record

// 这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻

snapshotReceiver.completeSnapshot();

}

// createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑

private void createDataEventsForTable(

RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,

EventDispatcher.SnapshotReceiver snapshotReceiver,

Table table)

throws InterruptedException {

long exportStart = clock.currentTimeInMillis();

LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id());

// 构建sql

final String selectSql =

StatementUtils.buildSplitScanQuery(

snapshotSplit.getTableId(),

snapshotSplit.getSplitKeyType(),

snapshotSplit.getSplitStart() == null,

snapshotSplit.getSplitEnd() == null);

LOG.info(

"For split '{}' of table {} using select statement: '{}'",

snapshotSplit.splitId(),

table.id(),

selectSql);

try (PreparedStatement selectStatement =

StatementUtils.readTableSplitDataStatement( // 创建statement,然后查询sql

jdbcConnection,

selectSql,

snapshotSplit.getSplitStart() == null,

snapshotSplit.getSplitEnd() == null, snapshotSplit.getSplitStart(),

snapshotSplit.getSplitEnd(),

snapshotSplit.getSplitKeyType().getFieldCount(),

connectorConfig.getQueryFetchSize());

// 然后对查询出来的数据进行封装成sourceRecord发送下游

ResultSet rs = selectStatement.executeQuery()) {

ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);

long rows = 0;

Threads.Timer logTimer = getTableScanLogTimer();

while (rs.next()) {

rows++;

final Object[] row = new Object[columnArray.getGreatestColumnPosition()];

for (int i = 0; i < columnArray.getColumns().length; i++) {

Column actualColumn = table.columns().get(i);

row[columnArray.getColumns()[i].position() - 1] =

readField(rs, i + 1, actualColumn, table);

}

if (logTimer.expired()) {

long stop = clock.currentTimeInMillis();

LOG.info(

"Exported {} records for split '{}' after {}",

rows,

snapshotSplit.splitId(),

Strings.duration(stop - exportStart));

snapshotProgressListener.rowsScanned(table.id(), rows);

logTimer = getTableScanLogTimer();

}

// 这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解

dispatcher.dispatchSnapshotEvent(

table.id(),

getChangeRecordEmitter(snapshotContext, table.id(), row),// 就是new了一个

snapshotReceiver);

}

LOG.info(

"Finished exporting {} records for split '{}', total duration '{}'",

rows,

snapshotSplit.splitId(),

Strings.duration(clock.currentTimeInMillis() - exportStart));

} catch (SQLException e) {

throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);

}

}

// ------------------------- dispatcher.dispatchSnapshotEvent方法之后的流程 ----------------------------------

// 进入evnentDisptcher.dispatchSnapshotEvent方法

public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {

DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);

if (dataCollectionSchema == null) {

errorOnMissingSchema(dataCollectionId, changeRecordEmitter);

}

changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {

@Override

public void changeRecord(DataCollectionSchema schema,

Operation operation,

Object key, Struct value,

OffsetContext offset,

ConnectHeaders headers)

throws InterruptedException {

eventListener.onEvent(dataCollectionSchema.id(), offset, key, value);

// 真正的放入队列的逻辑在这里调用

// receiver使我们传入的 对应BufferingSnapshotChangeRecordReceiver类

receiver.changeRecord(dataCollectionSchema, operation, key, value, offset, headers);

}

});

}

// BufferingSnapshotChangeRecordReceiver的changeRecord方法

// 前面简单介绍过他的处理逻辑了,就不必多做介绍了

@Override

public void changeRecord(DataCollectionSchema dataCollectionSchema,

Operation operation,

Object key, Struct value,

OffsetContext offsetContext,

ConnectHeaders headers)

throws InterruptedException {

Objects.requireNonNull(value, "value must not be null");

LOGGER.trace("Received change record for {} operation on key {}", operation, key);

if (bufferedEvent != null) {

queue.enqueue(bufferedEvent.get());

}

Schema keySchema = dataCollectionSchema.keySchema();

String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());

// the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks

bufferedEvent = () -> {

SourceRecord record = new SourceRecord(

offsetContext.getPartition(),

offsetContext.getOffset(),

topicName, null,

keySchema, key,

dataCollectionSchema.getEnvelopeSchema().schema(), value,

null, headers);

return changeEventCreator.createDataChangeEvent(record);

};

}

2.pollSplitRecords方法,这个方法是拉取queue中的数据上面把数据读取后写入到queue的流程已经捋清楚了,我们现在看看reader是在什么时候读取了queue的数据

代码语言:javascript复制public Iterator pollSplitRecords() throws InterruptedException {

checkReadException();

if (hasNextElement.get()) {

// data input: [low watermark event][snapshot events][high watermark event][binlog

// events][binlog-end event]

// data output: [low watermark event][normalized events][high watermark event]

boolean reachBinlogEnd = false;

final List sourceRecords = new ArrayList<>();

while (!reachBinlogEnd) {

// 可以看到这里直接queue.poll直接拉取数据即可,在这里会判断一下当前evnet是否是到达了结束的水位线,实际上就是高水位的位置,到达结束水位线之后,我们就可以停止了

List batch = queue.poll();

for (DataChangeEvent event : batch) {

sourceRecords.add(event.getRecord());

if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {

reachBinlogEnd = true;

break;

}

}

}

// snapshot split return its data once

hasNextElement.set(false);

return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster)

.iterator();

}

// the data has been polled, no more data

reachEnd.compareAndSet(false, true);

return null;

}

通过上面的阅读我们已经看到了数据从读取到,再到放入队列中的一个过程,这里对队列做一个说明,在上面的介绍中存在两个队列,一个queue,一个elementQueue,这个的区别在于,queue是在读取数据阶段,将数据放入queue,在FetcherTask中调用了reader的fetch方法,将queue中的数据拉取到,并将其加入到elementQueue

在上面的操作中数据已经放入了elementQueue中,现在我们看看elementQueue中的数据是在什么时候发送到下游的

这里我们需要重新回到MysqlSource中通过一张图来看看

我们在创建Reader的时候传入了一个MysqlRecordEmitter,在后面发送数据的时候是通过这个类

对于发送数据到下游的逻辑是在MysqlSourceReader的父类(SourceReaderBase)中,但是发送的类是有Emitter完成的

由于相关方法是有上层调用执行,我们就不多看了,就简单说明一下,系统调用SourceReaderBase.pollNext(),开始触发数据collect的操作,将其发送至下游节点

我们这里直接阅读MysqlRecordEmitter源码看看他发送数据的逻辑,其实到这里跟sourceFucntion实现的原理基本差不多了我们这里简单过一下即可

代码语言:javascript复制public final class MySqlRecordEmitter implements RecordEmitter {

private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =

new FlinkJsonTableChangeSerializer();

private final DebeziumDeserializationSchema debeziumDeserializationSchema;

private final boolean includeSchemaChanges;

private final OutputCollector outputCollector;

public MySqlRecordEmitter(

DebeziumDeserializationSchema debeziumDeserializationSchema,

MySqlSourceReaderMetrics sourceReaderMetrics,

boolean includeSchemaChanges) {

// 对数据Deserialization的一个对象,与单并行度的类是同一个,具体内部逻辑可以自己看

this.debeziumDeserializationSchema = debeziumDeserializationSchema;

this.sourceReaderMetrics = sourceReaderMetrics;

this.includeSchemaChanges = includeSchemaChanges;

this.outputCollector = new OutputCollector<>();

}

@Override

public void emitRecord(SourceRecord element, SourceOutput output, MySqlSplitState splitState)

throws Exception {

// 判断一下消息的事件类型,如果是一个sourceReocrd则发送下游,否则对应其事件的相关操作

if (isWatermarkEvent(element)) {

BinlogOffset watermark = getWatermark(element);

if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {

splitState.asSnapshotSplitState().setHighWatermark(watermark);

}

} else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) {

HistoryRecord historyRecord = getHistoryRecord(element);

Array tableChanges =

historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);

TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);

for (TableChanges.TableChange tableChange : changes) {

splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);

}

if (includeSchemaChanges) {

emitElement(element, output);

}

} else if (isDataChangeRecord(element)) {

if (splitState.isBinlogSplitState()) {

BinlogOffset position = getBinlogPosition(element);

splitState.asBinlogSplitState().setStartingOffset(position);

}

reportMetrics(element);

emitElement(element, output);

} else {

// unknown element

LOG.info("Meet unknown element {}, just skip.", element);

}

}

private void emitElement(SourceRecord element, SourceOutput output) throws Exception {

outputCollector.output = output;

// 调用不同的Deserialization.deserialize方法完成数据的转换以及发送到下游

debeziumDeserializationSchema.deserialize(element, outputCollector);

}

private static class OutputCollector implements Collector {

private SourceOutput output;

@Override

public void collect(T record) {

output.collect(record);

}

}

}

四.题外话 代码语言:javascript复制// RowData的DeserializeSchema,对应上面使用到的DeserializeSchema 做一个简单的介绍

public final class RowDataDebeziumDeserializeSchema

implements DebeziumDeserializationSchema {

@Override

public void deserialize(SourceRecord record, Collector out) throws Exception {

Envelope.Operation op = Envelope.operationFor(record);

Struct value = (Struct) record.value();

Schema valueSchema = record.valueSchema();

// 针对不同的操作类型,我们需要器对应的数据

// after表示更改之后的数据结果 before表示更改之前的数据

// 只有update的时候才需要同时使用 before和after(动动聪明脑袋瓜为啥呢)

// 针对不同的操作使用RowKind进行表示,在sql层面会根据数据的标识来进行对应处理,比如insert,比如update操作

// 所以对于table内容,我们只需要将数据转换成对应的RowData类型,并对其表示RowKind类型,框架便会帮我在sink的时候

// 做出对应的操作,我们无需编写相关代码来实现

// 所以 sql中使用 cdc 我们需要将其加上RowKind,对于后面的操作我们就无需关心了

// 我们在sql中 formt格式是json,实际上走的逻辑这里我也没看,因为我不太关心,如果大家关心的话或者想了解的,大家可以自己去看看

// 具体的实现方式

if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {

// 构建RowData的方法,我们有加进来,里面内容比较繁琐

// 讲讲大概内容

// 就是对于我们字段的类型,转换成对应的java中类型,当然这里面我说的是flink中,因为他对用到的类型都重新做了一层封装,这样才能支持我们db中的所有类型

GenericRowData insert = extractAfterRow(value, valueSchema);

validator.validate(insert, RowKind.INSERT);

insert.setRowKind(RowKind.INSERT);

// 具体的发送下游方法

emit(record, insert, out);

} else if (op == Envelope.Operation.DELETE) {

GenericRowData delete = extractBeforeRow(value, valueSchema);

validator.validate(delete, RowKind.DELETE);

delete.setRowKind(RowKind.DELETE);

emit(record, delete, out);

} else {

GenericRowData before = extractBeforeRow(value, valueSchema);

validator.validate(before, RowKind.UPDATE_BEFORE);

before.setRowKind(RowKind.UPDATE_BEFORE);

emit(record, before, out);

GenericRowData after = extractAfterRow(value, valueSchema);

validator.validate(after, RowKind.UPDATE_AFTER);

after.setRowKind(RowKind.UPDATE_AFTER);

emit(record, after, out);

}

}

}

对于我们想实现一个tableSource的话,我们需要继承DynamicTableSourceFactory,实现下面的方法,然后通过spi的方法将其动态的加载

createDynamicTableSource : 创建一个具体的sourcefactoryIdentifier : 可以认为是我们描述connector的名字,比如kafkarequiredOptions : with后面所必须要有参数,比如 username,password等,如果没有抛出异常optionalOptions : with后面的配置项,这是可选择的可有可无

相关推荐

QQ异地登陆,如何快速挂出常用IP地址?
mobile 365365051

QQ异地登陆,如何快速挂出常用IP地址?

📅 10-21 👁️ 9824
正宗黄焖鸡排行榜:黄焖鸡千万家,你最爱的是哪家?
你我贷还清后多久能再借款?3个关键点解析