如何基于日志,同步实现数据的一致性和实时抽取?

  • 时间:
  • 浏览:1
  • 来源:5分排列5_5分排列3

累似 :000102000012345678。 103 是日志文件号,12345678 是日志偏移量。

kafka一般只保存若干天的信息,太大保存完整篇 信息,而HDFS中还前要保存所有的历史增完整篇 的信息。这就使得只是 事情变为将会:

将会每次写的Parquet完整篇 回会小文件,我们都都我们都都知道HDFS对于小文件性能暂且好,随后 另外还另有一个job,每天定时将哪些的Parquet文件进行合并成大文件。

事情是从公司前段时间的需求说起,我们都都我们都都知道宜信是一家金融科技公司,我们都都我们都都的只是 数据与标准互联网企业不同,大致来说只是 我:

而binlog有并与否模式:

Granfana提供的是并与否实时监控能力。

常常我们都都我们都都遇到的需求是,将数据经过加工落地到数据库或HBase中。没办法 这里涉及到的另有一个间题只是 我,哪些样的数据还前要被更新到数据?

对于增量抽取,我们都都我们都都使用的是MySQL的日志文件号 + 日志偏移量作为唯一id。Id作为64位的long整数,高7位用于日志文件号,低12位作为日志偏移量。

我们都都我们都都个人的优缺点如下:

将会时间关系,我今天主要介绍DWS中的Dbus和Wormhole,在前要的随后 附带介绍一下Swifts。

图中是某业务系统的实时监控信息。上边是实时流量情形,下面是实数率时情形。还前要看得人,实时性还是很不错的,基本上1~2秒数据就将会到末端kafka中。

全量抽取,我们都都我们都都借鉴了Sqoop的思想。将全量抽取Storm分为了2 个要素:

通过Wormhole Wpark Streaming程序运行消费kafka的UMS,首先UMS log还前要被保存到HDFS上。

下面是具体的分片策略:

读取binlog的方案比较多,github上不少,参考https://github.com/search?utf8=%E2%9C%93&q=binlog。最终我们都都我们都都确定了阿里的canal做位日志抽取方。

说完Dbus,该说一下Wormhole,为哪些另有一个项目完整篇 回会另有一个,而要通过kafka来对接呢?

B:处于的数据,比较_ums_id_, 最终只将哪些_ums_id_更新较大row到目标数据库,小的直接离开。

将会statement 模式的缺点,在与我们都都我们都都的DBA沟通过程中了解到,实际生产过程中都使用row 模式进行克隆技术。这使得读取全量日志成为将会。

全量抽取的Storm程序运行是读取kafka的分片信息,采用多个并发度并行连接数据库备库进行拉取。将会抽取的时间将会很长。抽取过程中将实时情形写到Zookeeper中,便于心跳程序运行监控。

将会出先延时,则是通过dbus的心跳模块发送邮件报警或短信报警。

将会将会插入的_ums_id_比较大,是删除的数据(表明有些数据将会删除了), 将会完整篇 回会软删除,此时插入另有一个_ums_id_小的数据(旧数据),就会真的插入进去。

我们都都我们都都知道,嘴笨 MySQL InnoDB有个人的log,MySQL主备同步是通过binlog来实现的。如下图:

介于Spark原生对parquet支持的很好,Spark SQL太大 对Parquet提供很好的查询。UMS落地到HDFS上是保存到Parquet文件中的。Parquet的内容是所有log的增完整篇 信息以及_ums_id_,_ums_ts_都存下来。

把增量的Log作为一切系统的基础。后续的数据使用方,通过订阅kafka来消费log。

整个系统涉及到数据库的主备同步,Canal Server,多个并发度Storm程序运行等各个环节。

具体思路是:

Canal最早被用于阿里中美机房同步, canal原理相对比较简单:

哪些方案完整篇 回会算完美。我们都都我们都都在了解和考虑了不同实现办法后,最后借鉴了 linkedin的思想,认为要想一块儿处理数据一致性和实时性,比较合理的办法应该是来自于log。

这里补充说一下Swifts的作用:

难能可贵要软删除和加入_is_active_列,是为了只是 我并与否情形:

对于第六个间题,就涉及到_ums_id_了,将会我们都都我们都都将会保证了_ums_id_大的值更新,随后 在找到对应数据行后,根据有些原则来进行替换更新。

应用了DWS随后 ,借款人将会填写的信息将会记录到数据库中,并通过DWS实时的进行抽取、计算和落地到目标库中。根据对客户的打分,评价出优质客户。随后 立刻将有些客户的信息输出到客服系统中。

嘴笨 _ums_ts_与_ums_id_意图是累似 的,只不过有随后 _ums_ts_将会会重复,即在1毫秒中处于了多个操作,只是 我就得靠比较_ums_id_了。

(此图来自:https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/)

我们都都我们都都数据使用方的数据来自多个系统,随后 是通过T+1的办法获得报表信息,随后 指导第五六天的运营,只是 我时效性很差。

这里最重要的另有一个原则只是 我数据的幂等性。

说了没办法 多,DWS哪些实际运用呢?下面我来介绍某系统使用DWS实现了的实时营销。

将会没办法 实时抽取/计算/落库的能力,没办法 有些切都无法实现。

过去的通用做法有几种,分别是:

对于增量的log,通过订阅Canal Server的办法,我们都都我们都都得到了MySQL的增量日志:

玩数据的人都知道数据是非常有价值的,随后 哪些数据是保处于各个系统的数据库中,如保让前要数据的使用方得到一致性、实时的数据呢?

随后 对流程的监控和预警就尤为重要。

下面解释一下DWS平台,DWS平台是有六个子项目组成:

借款人通过Web或手机APP在系统A中填写信用信息时,将会会有些由于无法继续,嘴笨 将会有些借款人是另有一个优质潜在客户,但随后 将会无法或随后 太大 知道有些信息,只是 实际上只是 我的客户是流失了。

Jdbc的插入数据:插入数据到数据库中,保证幂等的原理嘴笨 简单,要想提高性能在实现上就变得冗杂只是 ,总太大要 根小根小的比较随后 在插入或更新。

确定Spark的理由是很充分的:

Wormhole spark streaming根据namespace 将数据分布存储到不同的目录中,即不同的表和版本装进不同目录中。

为哪些不使用dual write(双写)呢?,请参考https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/

无论是增量还是全量,最终输出到kafka中的消息完整篇 回会我们都都我们都都约定的另有一个统一消息格式,称为UMS(unified message schema)格式。

系统A的数据都保存到个人的数据库中,我们都都我们都都知道,宜信提供只是 金融服务,其中包括借款,而借款过程中怪怪的要的只是 我信用审核。

对于第另有一个间题,嘴笨 就前要定位数据要找另有一个唯一的键,常见的有:

还前要说HDFS中的日志是只是 的事情基础。

Dbus 的MySQL版主要处理方案如下:

从提高性能的深度,我们都都我们都都还前要将整个Spark Streaming的Dataset集合直接插入到HBase,不前要比较。让HBase基于version自动替我们都都我们都都判断哪些数据还前要保留,哪些数据不前要保留。

通过心跳模块,累似 每分钟(可配置)对每个被抽取的表插入根小心态数据并保存发送时间,有些心跳表也被抽取,跟随着整个流程下来,与被同步表在实际上走相同的逻辑(将会多个并发的的Storm将会有不同的分支),当收到心跳包的随后 ,即便没办法 任何增完整篇 的数据,太大 证明整条链路是通的。

在整个数据传输中,为了尽量的保证日志消息的顺序性,kafka我们都都我们都都使用的是另有一个partition的办法。在一般情形下,基本上是顺序的和唯一的。

图片来自:https://github.com/alibaba/canal

在考虑并发情形下,插入和更新都将会出先失败,没办法 还有考虑失败后的策略。

我们都都我们都都知道Spark的RDD/dataset完整篇 回会以集合的办法来操作以提高性能,同样的我们都都我们都都前要以集合操作的办法实现幂等性。

通过DWS,将数据从多个系统中实时抽取,计算和落地,并提供报表展示,使得运营还前要及时作出部署和调整,快速应对。

对于全量抽取,_ums_id_是唯一的,从zk中每个并发度分别取不同的id片区,保证了唯一性和性能,填写负数,太大与增量数据冲突,也保证我们都都我们都都是早于增量消息的。

对于initial load(第一次加载),同样开发了全量抽取Storm程序运行通过jdbc连接的办法,从源端数据库的备库进行拉取。initial load是拉完整篇 数据,只是 我们都都我们都都推荐在业务低峰期进行。好在只做一次,不前要每天都做。

在考虑使用Storm作为处理方案的随后 ,我们都都我们都都主只是 我认为Storm有以下优点:

对于无法插入有些情形(比如目标系统有间题),Wormhole还有重试机制。插入到有些存储中的就太大介绍了,总的原则是:根据个人存储自身底部形态,设计基于集合的,并发的插入数据实现。哪些完整篇 回会Wormhole为了性能而做的努力,使用Wormhole的用户暂且关心 。

使用Spark的同学都知道,RDD/dataset完整篇 回会还前要partition的,还前要使用多个worker并进行操作以提高数率。

考虑到数据安全性,对于有脱敏需求的场景,Dbus的全量storm和增量storm程序运行也完成了实时脱敏的功能。脱敏办法有3种:

如前面所说,Dbus主要处理的是将日志从源端实时的抽出。 这里我们都都我们都都以MySQL为例子,简单说明如保实现。

客服人员在很短的时间(几分钟以内)就通过打电话的办法联系上有些借款人(潜客),进行客户关怀,将有些潜客转换为真正的客户。我们都都我们都都知道借款是有时效性的,将会时间太大就没办法 价值了。

无论是遇到增完整篇 任何的数据,我们都都我们都都面临的间题完整篇 回会:

插入数据到Hbase中,相当要简单有些。不同的是HBase还前要保留多个版本的数据(当然也还前要只保留另有一个版本)默认是保留六个版本;

比如:

此处来自:http://www.jquerycn.cn/a_13625

Wormhole和Swifts对比如下:

随后 我们都都我们都都知道写kafka会失败,有将会重写,Storm也用重做机制,随后 ,我们都都我们都都暂且严格保证exactly once和完整篇 的顺序性,但保证的是at least once。

A:不处于的数据,即这要素数据insert就还前要;

比如:将会别的worker将会插入,没办法 将会唯一性约束插入失败,没办法 前要改为更新,前要比较_ums_id_看是与否太大 更新。

这里就太大做解释了。

于是我们都都我们都都提出了构建另有一个基于log的公司级的平台的想法。

UMS中支持的数据类型,参考了Hive类型并进行冗杂,基本上含高了所有数据类型。

Version的确定很有意思,利用_ums_id_的唯一性和自增性,与version自身的比较关系一致:即version较大等价于_ums_id_较大,对应的版本较新。

如图所示,Wormhole 还前要将kafka中的UMS 落地到各种系统,目前用的最多的HDFS,JDBC的数据库和HBase。

通常我们都都我们都都的MySQL布局是采用 另有一个master主库(vip)+ 另有一个slave从库 + 另有一个backup容灾库 的处理方案,将会容灾库通常是用于异地容灾,实时性不高只是 我便于部署。

作者:王东

在Wormhole中,根小flow是指从另有一个namaspace从源端到目标端。另有一个spark streaming服务于多条flow。

如上图所示:

其中很大另有一个由于只是 我解耦,kafka具有天然植物的解耦能力,程序运行直接还前要通过kafka做异步的消息传递。Dbus和Wornhole內部也使用了kafka做消息传递和解耦。

图中:

数据分片前要考虑分片列,按照配置和自动确定列将数据按照范围来分片,并将分片信息保存到kafka中。

嘴笨 UMS的结果还前要直接订阅,但还前要开发的工作。Wormhole处理的是:提供一键式的配置,将kafka中的数据落地到各种系统中,让没办法 开发能力的数据使用方通过wormhole来实现使用数据。

另外另有一个实时报表的应用如下:

只是 我,从日志层面保证了物理唯一性(即便重做也有些id号只是 我变),一块儿也保证了顺序性(还能定位日志)。通过比较_ums_id_ 消费日志就能通过比较_ums_id_知道哪条消息更新。

这就由于旧数据被插入了。不幂等了。只是 被删除的数据依然保留(软删除)是有价值的,它能被用于保证数据的幂等性。

借款人前要提供证明具有信用价值的信息,比如央行征信报告,是具有最强信用数据的数据。 而银行流水,网购流水也是具有较强的信用属性的数据。

为了最小化对源端产生影响,显然我们都都我们都都读取binlog日志应该从slave从库读取。

总结一下:简单的说,Dbus只是 我将各种源的数据,实时的导出,并以UMS的办法提供订阅, 支持实时脱敏,实际监控和报警。

随后 插入数据到HBase,前要处理的间题是:

另外另有一个由于只是 我,UMS是自描述的,通过订阅kafka,任何有能力的使用方来直接消费UMS来使用。

在技术栈上, wormhole确定使用spark streaming来进行。

Storm程序运行和益跳程序运行将数据发送公共的统计topic,再由统计程序运行保存到influxdb中,使用grafana进行展示,就还前要看得人如下效果:

每个Parquet文件目录都含高文件数据的起始时间和随后 开始时间。只是 我在回灌数据时,还前要根据确定的时间范围来决定前要读取哪些Parquet文件,暂且读取完整篇 数据。

为哪些使用log和kafka作为基础,而不使用Sqoop进行抽取呢? 将会:

来源:宜信技术学院

消息中schema要素,定义了namespace 是由 类型+数据源名+schema名+表名+版本号+分库号+分表号 太大 描述整个公司的所有表,通过另有一个namespace就能唯一定位。

对于流水表,有增量要素就够了,随后 有些表前要知道最初(已处于)的信息。这随后 我们都都我们都都前要initial load(第一次加载)。

payload是指具体的数据,另有一个json包上边还前要含高1条至多条数据,提高数据的有效载荷。

如下图所示:



图片来自:https://github.com/alibaba/canal

随后 _ums_id_变得尤为重要。