地址 : 上海浦东新区康桥路957弄D做2020室《万象》杂志社
投稿邮箱 : bianji7@126.com
李子乾 朱青 徐雨申
摘 要:随着市场上自主研发的数据库的大量出现,面对各具特色的数据库因差异性而导致的数据etl过程困难的问题,本文探讨了一种通过kafka作为可靠的实时数据中转,然后通过spark streaming任务来实现数据入库的技术路线,最终解决了基于电力客服业务的数据仓库实时数据接入的问题。该技术路线能够一定程度上解决一部分数据实时数据接入的困难。与此同时,可以在数据接入过程中进行复杂的数据流式计算。
关键词:实时数据接入;kafka offset;spark streaming
中图分类号:TM769 文献标识码:A 文章编号:1671-2064(2020)01-0038-03
1 背景与问题
现如今大数据技术发展日新月异,数据处理框架及方法也与日俱增,但是,数据集成的接入方法缺存在一定短板,如在满足实时数据接入的需求上存在一定的难度及不实用性。伴随着电力客服业务体量的扩展及海量业务数据的不断增加,数据接入的实时性问题成了阻碍进一步数据分析应用的绊脚石,因此,一种数据仓库实时数据接入方法应运而生,本文将主要针对电力客服业务数据的数据仓库实时接入方法进行的研究及论证。
在数据仓库建设过程,一般数据仓库的技术选型大多采用了mpp集群。基于云计算理念的并行数据库集群,能够支持TB到PB级别的结构化数据存储、高效查询[3]。电力客服数据仓库采用了分层的架构,在最接近原始业务系统数据的一层称为数据明细层,详细数据仓库架构如图1所示。
将业务数据从数据贴源区接入到数据仓库明细层的过程中,需要实现部分结果化数据实时接入。在传统的关系型数据库中,实时数据接入常使用oracle goldengate(下简称为ogg)来进行数据接入。所以,首先考虑使用ogg来进行实时数据接入。ogg出于对数据可靠性的保证,在数据接入的过程中,需要创建checkpoint来记录数据接入任务的执行状态,以保证如果接入进行中断的时候,可以通过读取checkpoint的状态数据来恢复接入进程中断前的状态。ogg创建的checkpoint的主键是包含4个字段,而mpp集群支持数据表的最大联合主键数为3。因此,mpp集群作为ogg数据接入的目的端的时候,无法正常使用检查点功能,数据传输的可靠性收到了巨大的影响。
本文在接下来的篇幅中,将详细介绍,一种可以解决可靠实时数据传输的技术。
2 解决办法
为了解决在mpp集群中无法创建checkpointtable的问题。首先尝试,通过手动的方式来创建checkpointtable,但是自行创建的表,ogg数据抽取进程无法识别。
在经过一些方式的尝试后,决定通过在源端和目标端添加中转的方式来实现mpp集群中的实时数据接入。
在仔细分析了ogg的checkpoint table的实现机制后,发现其与kafka的offset机制十分类似。在kafka中,offset是一个用于存储每个消息被追加到分区的序列号的变量,offset的值是随着消息的消費情况不断更新的。[1]
具体来说,kafka中的Offset分为两种:Current Offset和Committed Offset。
Current Offset保存在消费者侧,表示consumer消费者已经接收的消息序号。举个例子来说,consumer目前接收了10条消息,则当前current offset的值为10。于是消费者下一次消费的时候,就会从第11条消息开始,这样可以避免每次消费者从topic中获取消息的时候可以避免重复。
而commited offset保存在broker上,表示consumer消费者消费过的消息序号。举例来说,消费者接收了10条数据,此时消费者这边的current offset是10。但是消费者接收到消息后,是否真正意义上消费了该条消息是不确定的。这里就涉及到kafka的可靠信息传输机制,kafka的消息在被消费者消费后,是需要消费者反馈和同步消费情况的。这一特性实现的机制是依靠commitSync和commitAsync两个方法来实现的。当消费者接收到topic的消息后,current offset会立刻更新到最新的消息序号,然后消费者拿到接收到的数据后,开始进行消费也就是计算和处理。完成消费过程后,就会通过调用commitSync和commitAsync将消息的消费情况返回给topic,broker在接收到commitSync和commitAsync信号后,会将commited offset更新为最新的序号。表示当前已确认消费的序号。
如图2所示,committed offset为3,current offset为5。这表明当前时间,消费者虽然接收到了5条消息,但是第4条与第5条并未被消费,已经消费的消息才到第三条,当第四条和第五条消费之后,消费者会返回一个同步信号给broker,然后committed offset才会更新。而与此同时,current offset会不断增长,消费者接收数据的过程是不会停止的。与消费是同时进行的,两者并不干扰,但current offset始终比committed offset要大。
Committed offset在broker一端是单独由一个topic来记录和管理的。当其更新的时候,最新的commited offset就会被写入__consumer_offsets的topic中。这样当kafka出现进程意外停止或者是consumer group成员出现变化,需要consumer rebalance的时候,commited offset就可以保证新的Consumer能够从正确的位置开始消费一条消息,从而避免重复消费。这样的话,就可以实现类似于ogg中的checkpoint table的功能。[1]
3 接入设计
基于上述的kafka offset机制,可以完美地解决mpp集群无法创建检查点的问题。首先将原始贴源区数据通过ogg的方式实时接入到kafka中,然后通过spark streaming程序订阅kafka中的消息,将ogg格式的消息转换处理成mpp格式数据,最终实现数据仓库实时数据接入。
3.1 oracle实时数据接入kafka
在ogg将数据接入到kafka过程中,首先需要通过抽取进程,将数据抽取放进本地指定的数据文件队列,然后通过投递进程,将数据文件传送到目的端,目的端ogg客户端在接收到数据文件后,将文件放入指定的远程数据队列中,然后通过复制进程,将数据文件解析后,以生产者的方式将数据发布到kafka的topic中。Oracle goldengate原理详细情况如图3所示。
3.1.1 源与目标端配置管理进程
在源端和目標端完成ogg的安装后,两端都需要配置mgr管理进程。Manager进程是ogg的控制进程,运行在源端和目标端上。它主要作用有以下几个方面:启动、监控、重启Goldengate的其他进程,报告错误及事件,分配数据存储空间,发布阀值报告等。
在管理进程中需要配置的有:ogg进程的监听端口;出了指定的固定端口以外还需要指定一系列的动态端口列表,当指定的默认的监听端口不可用时,会在动态端口中随机挑选一个作为进程的监听端口;另外需要设置自动重启的参数用于管理进程下某个进程中断或者是重启管理进程,管理进程会自动重启这些进程,设定重启的最大次数以及时间间隔;此外还需要设定定期清理ogg的传输数据文件的周期。
3.1.2 oracle源端配置抽取进程以及投递进程
在源端配置抽取进程,将需要进行实时传输的数据表配置进抽取进程。
配置过程中需要配置动态解析源端数据表;需要设置环境变量,指定源端数据库以及字符集以及连接源端数据库的密码;然后指定数据库中抽取出的数据的保存位置以及文件名;最后配置需要复制表的清单。
在配置完抽取进程后,需要配置相配合的投递进程,用于将抽取出来的队列文件发送到指定的目的端服务器上。在配置过程重要配置参数有,除了同样需要配置禁止ogg与oracle交互以及动态解析以外,还需要配置远程目标端的ip地址和目的端的管理进程的监听端口,用于构建点到点的数据传输通道,同样需要配置目的端用于存放传输的队列文件的路径。
配置好抽取和投递进程后,需要分别将本地队列文件路径和目标端的队列文件路径和抽取进程进行绑定。
3.1.3 配置数据表define文件
ogg在传输数据的过程中,需要将传输的数据表的详细定义信息发送到目标端。
首先,配置一个需要导出表定义文件的表清单,然后在在ogg根目录下调用defgen指令执行对应的配置文件,即可自动生成数据表定义文件,然后将表定义文件发送到目的端的指定目录下。
3.1.4 kafka目标端配置检查点
kafka端配置好检查点后,会自动记录数据同步的当前进度,当程序中断恢复的时候,将从检查点表中记录的最新状态还原。
3.1.5 kafka目标端配置复制进程
源端通过抽取进程从数据表中抽取了数据文件,然后通过投递进程发送到目的端。目的端接收到数据文件后,需要使用复制进程,将接收的数据文件插入到目标端指定的数据库中,此处即将数据复制进kafka中,复制进程中设计详细的参数有:
复制进程中需要指定从源端服务器上传输过来的表定义文件;然后定义kafka的详细配置,以及对数据进入kafka指定了固定格式以及方式和复制任务的报告生成频率;然后需要设置复制进程以事务传输时,事务合并的单位,用这种方式来减少IO操作;最后需要详细配置源端与目标端的映射关系。
配置好复制进程后,同样需要将队列文件路径绑定到复制进程上。
3.1.6 配置kafka.props
此处主要需要配置的参数为topicname,表示数据进入的topic名称,以及数据进入topic的指定数据格式。
3.1.7 启动全部进程
将上述配置好的抽取,投递,复制进程逐一启动,启动任务后,数据将实时从源端数据库接入到kafka中。
3.2 kafka数据实时接入mpp集群
将kafka topic中数据接入到数据仓库中,首先需要将kafka的数据取出,需要定义一个消费者来消费topic中数据。然后在消费者的程序中将kafka中的数据解析成可以加载进入mpp集群的的数据格式,可以解析成纯数据文件,也可以直接将数据换成sql语句。
首先,需要通过sparkStreaming来读取kafka的数据,这里存在两种模式,是receiver-base和direct。两者存在一定的区别。
Receiver模式首先创建一个receiver也就是接收器从kafka接收数据并存储在Spark executor中,然后用触发的方式去处理接收到的数据。为了不丢数据,需要开启WAL机制,这会将receiver接收到的数据写一份备份到其他的存储组件中去。
Direct模式,是定期查询kafka中的每个partition的最新的offset,每个批次拉取上次处理的offset和当前查询的offset的范围的数据进行处理。这种模式为了保证数据传输的可靠性,offset是需要手动保存的。
这里,本文介绍的方法中,选用了direct模式来消费kafka。
在配置好连接kafka的功能后,将每个接受到的消息转化为sparkStreaming rdd任务,对每个rdd任务进行数据的解析。[4]
由于kafka中保存的ogg同步数据格式可以解析成json格式,获取到json数据后,可以通过数据中的操作类型来将数据还原成sql。
在完成sql的解析还原后,只需要通过执行jdbc去执行该条sql,就可以完成数据的入库。
在整个过程中,通过创建streamingContext类来实现数据流的监听功能。[5]
这里通過终端等待的方式,来监听kafka中的数据,当kafka中产生新的数据时,kafka就会给spark streaming发送signal,而spark程序则会以响应的方式,立刻去消费kafka中新产生的数据。[2]但是这样的过程是存在一定的延迟的。在实际的测试过程中,这样的延迟可能会达到5s左右,对于数据仓库实时数据接入的需求来看,这样程度的延迟是可以接受的。
综上,上述过程即为数据仓库实时数据接入的一种实现方式。
4 结语
本文详细介绍了,在传统的实时数据传输工具(ogg)存在一定局限性的时候,首先分析了限制工具的影响因素,从中分析出了根本原因,由于mpp集群的主键数量限制,采用将数据先实时传输到kafka中,并且通过kafka实现了数据的可靠传输,然后通过spark Streaming实现了响应式的数据入库过程。最终以几秒的延迟代价,解决了数据仓库mpp集群的实时数据接入问题。
从这个技术路线向外延展,通过kafka中转,再到spark Streaming流处理。该条技术实现完全可以实现更多的数据处理与计算。通过spark Streaming,kafka中的数据完全可以流向更多的数据组件而不仅仅局限于本文所提到的mpp集群。
参考文献
[1] 费秀宏.基于Kafka的日志处理平台的研究[D].长春:吉林大学,2017.
[2] 薛瑞,朱晓民.基于Spark Streaming的实时日志处理平台设计与实现[J].电信工程技术与标准化,2015(09):55-58.
[3] EfemG.Mallach.决策支持与数据仓库系统[M].李昭智,李昭勇,译.北京:电子工业出版社,2001.
[4] 党寿江,刘学,王星凯,等.基于Spark Streaming的实时数据采集分析系统设计[J].网络新媒体技术,2017(05):48-53.
[5] 韩德志,陈旭光,雷雨馨,等.基于Spark Streaming的实时数据分析系统及其应用[J].计算机运用,2017(05):1263-1269.
编辑整理:万象杂志社编辑部
万象官方网站:http://www.sdnjbjb.com/