Apache Flink结合Kafka构建端到端的Exactly-Once处理

本文翻译自:https://data-artisans.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka

Apache Flink自2017年12月发布的1.4.0版本开始,为流计算引入了一个重要的里程碑特性:TwoPhaseCommitSinkFunction(相关的Jira)。它提取了两阶段提交协议的通用逻辑,使得通过Flink来构建端到端的Exactly-Once程序成为可能。同时支持一些数据源(source)和输出端(sink),包括Apache Kafka 0.11及更高版本。它提供了一个抽象层,用户只需要实现少数方法就能实现端到端的Exactly-Once语义。

有关TwoPhaseCommitSinkFunction的使用详见文档: TwoPhaseCommitSinkFunction。或者可以直接阅读Kafka 0.11 sink的文档: kafka

接下来会详细分析这个新功能以及Flink的实现逻辑,分为如下几点。

  • 描述Flink checkpoint机制是如何保证Flink程序结果的Exactly-Once的
  • 显示Flink如何通过两阶段提交协议与数据源和数据输出端交互,以提供端到端的Exactly-Once保证
  • 通过一个简单的示例,了解如何使用TwoPhaseCommitSinkFunction实现Exactly-Once的文件输出

Apache Flink应用程序中的Exactly-Once语义

当我们说『Exactly-Once』时,指的是每个输入的事件只影响最终结果一次。即使机器或软件出现故障,既没有重复数据,也不会丢数据。

Flink很久之前就提供了Exactly-Once语义。在过去几年中,我们对Flink的checkpoint机制有过深入的描述,这是Flink有能力提供Exactly-Once语义的核心。Flink文档还提供了该功能的全面概述

在继续之前,先看下对checkpoint机制的简要介绍,这对理解后面的主题至关重要。

一次checkpoint是以下内容的一致性快照:

  • 应用程序的当前状态
  • 输入流的位置

Flink可以配置一个固定的时间点,定期产生checkpoint,将checkpoint的数据写入持久存储系统,例如S3或HDFS。将checkpoint数据写入持久存储是异步发生的,这意味着Flink应用程序在checkpoint过程中可以继续处理数据。

如果发生机器或软件故障,重新启动后,Flink应用程序将从最新的checkpoint点恢复处理; Flink会恢复应用程序状态,将输入流回滚到上次checkpoint保存的位置,然后重新开始运行。这意味着Flink可以像从未发生过故障一样计算结果。

在Flink 1.4.0之前,Exactly-Once语义仅限于Flink应用程序内部,并没有扩展到Flink数据处理完后发送的大多数外部系统。Flink应用程序与各种数据输出端进行交互,开发人员需要有能力自己维护组件的上下文来保证Exactly-Once语义。

为了提供端到端的Exactly-Once语义 – 也就是说,除了Flink应用程序内部,Flink写入的外部系统也需要能满足Exactly-Once语义 – 这些外部系统必须提供提交或回滚的方法,然后通过Flink的checkpoint机制来协调。

分布式系统中,协调提交和回滚的常用方法是两阶段提交协议。在下一节中,我们将讨论Flink的TwoPhaseCommitSinkFunction是如何利用两阶段提交协议来提供端到端的Exactly-Once语义。

Flink应用程序端到端的Exactly-Once语义

我们将介绍两阶段提交协议,以及它如何在一个读写Kafka的Flink程序中实现端到端的Exactly-Once语义。Kafka是一个流行的消息中间件,经常与Flink一起使用。Kafka在最近的0.11版本中添加了对事务的支持。这意味着现在通过Flink读写Kafaka,并提供… --> 阅读全文

Hadoop : 一个目录下的数据只由一个map处理

有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。

刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 “mapreduce job让一个文件只由一个map来处理“。

或者是把目录写在文件里面,作为输入:

/path/to/directory1
/path/to/directory2
/path/to/directory3

代码里面按行读取:


都不能满足需求,还是自己实现一个 OneMapOneDirectoryInputFormat 吧,也很简单:

这个InputFormat的具体使用方法就不说了。其实与“一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat”中的MultiFileInputFormat比较类似。

Mac下实现ssh自动登录和主机管理

在从Windows系统迁移到Mac后,ssh登录用的不太顺手。由于用惯了SecureCRT的自动登录和主机管理功能,而正版的SecureCRT for Mac OS X太贵了(99$),秉承着Mac下不用D版软件的想法,一直在寻找可以替代SecureCRT的方法。

Mac下没有类似SecureCRT的软件,自带的终端功能太弱了,就改用了Iterm2
Iterm2确实比较好用:

    支持多tab
      Cmd+left arrow 切换到左边tab
      Cmd+right arrow 切换到邮编tab
      cmd+数字 直接切换到某个tab
      Cmd-Option-E 显示所有tab,以便搜索导航
    可以将窗口水平、垂直布局,分成多个pane
      Cmd-D 垂直切分
      Cmd-Shift-D 水平切分
      Cmd-[ 和 Cmd-] 在不同pane中切换

但是Iterm2也不能像SecureCRT那样方便管理主机和自动登录,只能通过脚本来解决了。在网上找到用expect做自动登录的项目ssh-auto-login,和一个用python写的主机管理程序sshgo,发现把两个工程合并在一起,刚好可以实现SSH自动登录和主机管理功能。

ssh-auto-login其实就是用expect实现自动登录功能。expect的用法如下:

sshgo这个项目是用python脚本写了主机管理的界面,选择相应主机后,会执行ssh命令。我改了下,在选择主机后不是简单的执行ssh,而是调用expect脚本,这样就能免输密码访问了。

感谢Github和两个项目的作者,我只是把两个的功能合在一起而已。修改后的代码放在了github(ssh-auto-login-manage)上,项目的描述为:

SSH auto login in without password and managing hosts

--> 阅读全文

一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat

某日,接手了同事写的从Hadoop集群拷贝数据到另外一个集群的程序,该程序是运行在Hadoop集群上的job。这个job只有map阶段,读取hdfs目录下数据的数据,然后写入到另外一个集群。

显然,这个程序没有考虑大数据量的情况,如果输入目录下文件很多或数据量很大,就会导致map数很多。而实际上我们需要拷贝的一个数据源就有近6T,job启动起来有1w多个map,一下子整个queue的资源就占满了。虽然通过调整一些参数可以控制map数(也就是并发数),但是无法准确的控制map数,而且换个数据源又得重新配置参数。

第一个改进的版本是,加了Reduce过程,以期望通过设置Reduce数量来控制并发数。这样虽然能精确地控制并发数,但是增加了shuffle过程,实际运行中发现输入数据有倾斜(而partition的key由于业务需要无法更改),导致部分机器网络被打满,从而影响到了集群中的其他应用。即使通过 mapred.reduce.parallel.copies 参数来限制shuffle也是治标不治本。这个平白增加的shuffle过程实际上浪费了很多网络带宽和IO。

最理想的情况当然是只有map阶段,而且能够准确的控制并发数了。

于是,第二个优化版本诞生了。这个job只有map阶段,采用CombineFileInputFormat,它可以将多个小文件打包成一个InputSplit提供给一个Map处理,避免因为大量小文件问题,启动大量map。通过 mapred.max.split.size 参数可以大概地控制并发数。本以为这样就能解决问题了,结果又发现了数据倾斜的问题。这种粗略地分splits的方式,导致有的map处理的数据少,有的map处理的数据多,并不均匀。几个拖后退的map就导致job的实际运行时间长了一倍多。

看来只有让每个map处理的数据量一样多,才能完美的解决这个问题了。

第三个版本也诞生了,这次是重写了CombineFileInputFormat,自己实现getSplits方法。由于输入数据为SequenceFile格式,因此需要一个SequenceFileRecordReaderWrapper类。

实现代码如下:
CustomCombineSequenceFileInputFormat.java

MultiFileInputFormat.java

通过 multifileinputformat.max_split_num 参数就可以较为准确的控制map数量,而且会发现每个map处理的数据量很均匀。至此,问题总算解决了。

hbase的行锁与多版本并发控制(MVCC)

MVCC (Multiversion Concurrency Control),即多版本并发控制技术,它使得大部分支持行锁的事务引擎,不再单纯的使用行锁来进行数据库的并发控制,取而代之的是,把数据库的行锁与行的多个版本结合起来,只需要很小的开销,就可以实现非锁定读,从而大大提高数据库系统的并发性能。

HBase正是通过行锁+MVCC保证了高效的并发读写。

为什么需要并发控制

HBase系统本身只能保证单行的ACID特性。ACID的含义是:

  • 原子性(Atomicity)
  • 一致性(Consistency)
  • 隔离性(Isolation)
  • 持久性(Durability)

传统的关系型数据库一般都提供了跨越所有数据的ACID特性;为了性能考虑,HBase只提供了基于单行的ACID。

下面是一个hbase并发写的例子。

原始数据如下
mvcc

Apache HBase Write Path一文可以知道hbase写数据是分为两步:
1. 写Write-Ahead-Log(WAL)文件
2. 写MemStore:将每个cell[(row,column)对]的数据写到内存中的memstore

写写同步

假定对写没有采取并发控制,并考虑以下的顺序:

mvcc

最终得到的结果是:

mvcc

这样就得到了不一致的结果。显然我们需要对并发写操作进行同步。
最简单的方式是提供一个基于行的独占锁来保证对同一行写的独立性。所以写的顺序是:

  • (0) 获取行锁
  • (1) 写WAL文件
  • (2) 更新MemStore:将每个cell写入到memstore
  • (3) 释放行锁

读写同步

尽管对并发写加了锁,但是对于读呢?见下面的例子:
mvcc

如果在上面的图中红线所示的地方进行读操作,最终得到的结果是:
mvcc

可见需要对读和写也进行并发控制,不然会得到不一致的数据。最简单的方案就是读和写公用一把锁。这样虽然保证了ACID特性,但是读写操作同时抢占锁会互相影响各自的性能。

MVCC算法

HBase采用了MVCC算法来避免读操作去获取行锁。

对于写操作:

  • (w1) 获取行锁后,每个写操作都立即分配一个写序号
  • (w2) 写操作在保存每个数据cell时都要带上写序号
  • (w3)
--> 阅读全文