您所在位置: 网站首页 / 文档列表 / 行业软件 / 文档详情
Spark Streaming作业提交源码分析接收数据篇.pdf 立即下载
上传人:如灵****姐姐 上传时间:2024-09-04 格式:PDF 页数:2 大小: 金币:10 举报 版权申诉
预览加载中,请您耐心等待几秒...

Spark Streaming作业提交源码分析接收数据篇.pdf

SparkStreaming作业提交源码分析接收数据篇.pdf

预览

在线预览结束,喜欢就下载吧,查找使用更方便

10 金币

下载文档

如果您无法下载资料,请参考说明:

1、部分资料下载需要金币,请确保您的账户上有足够的金币

2、已购买过的文档,再次下载不重复扣费

3、资料包下载后请先用软件解压,在使用对应软件打开

SparkStreaming作业提交源码分析接收数据篇我们通过KafkaUtils.createStream函数可以创建KafkaReceiver类(这是默认的KafkaReceiver,如果spark.streaming.receiver.writeAheadLog.enable配置选项设置为true,则会使用ReliableKafkaReceiver,其中会使用WAL机制来保证数据的可靠性,也就是保证数据不丢失。)在KafkaReceiver类中首先会在onStart方法中初始化一些环境,比如创建Consumer(这个就是用来从Kafka的Topic中读取消息的消费者)。在初始化完相关环境之后会在线程池中启动MessageHandler来从Kafka中接收数据://HandlesKafkamessagesprivateclassMessageHandler(stream:KafkaStream[K,V])extendsRunnable{defrun(){logInfo("StartingMessageHandler.")try{valstreamIterator=stream.iterator()while(streamIterator.hasNext()){valmsgAndMetadata=streamIterator.next()store((msgAndMetadata.key,msgAndMetadata.message))}}catch{casee:Throwable=>logError("Errorhandlingmessage;exiting",e)}}}该线程负责从Kafka中读取数据,并将读取到的数据存储到BlockGenerator中(通过调用store方法实现),msgAndMetadata.key其实就是Topic的Key值;而msgAndMetadata.message就是我们要的消息。store函数是Receiver类提供的,所有继承自该类的子类(KafkaReceiver、ActorReceiver、ReliableKafkaReceiver等)都拥有该方法。其内部的实现是调用了blockGenerator的addData方法,最终是将数据存储在currentBuffer中,而currentBuffer其实就是一个ArrayBuffer[Any]。在blockGenerator内部存在两个线程:(1)、定期地生成新的batch,然后再将之前生成的batch封装成block。这里的定期其实就是spark.streaming.blockInterval参数配置的。(2)、将生成的block发送到BlockManager中。第一个线程定期地调用updateCurrentBuffer函数将存储在currentBuffer中的数据封装成Block,然后放在blocksForPushing中,blocksForPushing是ArrayBlockingQueue[Block]类型的队列,其大小默认是10,我们可以通过spark.streaming.blockQueueSize参数配置(当然,在很多情况下这个值不需要我们去配置)。当blocksForPushing没有多余的空间,那么该线程就会阻塞,直到有剩余的空间可用于存储新生成的Block。如果你的数据量真的很大,大到blocksForPushing无法及时存储那些block,这时候你就得考虑加大spark.streaming.blockQueueSize的大小了。updateCurrentBuffer函数的实现如下:/**Changethebuffertowhichsinglerecordsareaddedto.*/privatedefupdateCurrentBuffer(time:Long):Unit=synchronized{try{valnewBlockBuffer=currentBuffercurrentBuffer=newArrayBuffer[Any]if(newBlockBuffer.size>0){valblockId=StreamBlockId(receiverId,time-blockInterval)valnewBlock=newBlock(blockId,newBlockBuffer)listener.onGenerateBlock(blockId)blocksForPushing.put(newBlock)//putisblockingwhenqueueisfulllogDebug("Lastelementin"+block
单篇购买
VIP会员(1亿+VIP文档免费下)

扫码即表示接受《下载须知》

Spark Streaming作业提交源码分析接收数据篇

文档大小:

限时特价:扫码查看

• 请登录后再进行扫码购买
• 使用微信/支付宝扫码注册及付费下载,详阅 用户协议 隐私政策
• 如已在其他页面进行付款,请刷新当前页面重试
• 付费购买成功后,此文档可永久免费下载
年会员
99.0
¥199.0

6亿VIP文档任选,共次下载特权。

已优惠

微信/支付宝扫码完成支付,可开具发票

VIP尽享专属权益

VIP文档免费下载

赠送VIP文档免费下载次数

阅读免打扰

去除文档详情页间广告

专属身份标识

尊贵的VIP专属身份标识

高级客服

一对一高级客服服务

多端互通

电脑端/手机端权益通用

手机号注册 用户名注册
我已阅读并接受《用户协议》《隐私政策》
已有账号?立即登录
我已阅读并接受《用户协议》《隐私政策》
已有账号?立即登录
登录
手机号登录 微信扫码登录
微信扫一扫登录 账号密码登录

首次登录需关注“豆柴文库”公众号

新用户注册
VIP会员(1亿+VIP文档免费下)
年会员
99.0
¥199.0

6亿VIP文档任选,共次下载特权。

已优惠

微信/支付宝扫码完成支付,可开具发票

VIP尽享专属权益

VIP文档免费下载

赠送VIP文档免费下载次数

阅读免打扰

去除文档详情页间广告

专属身份标识

尊贵的VIP专属身份标识

高级客服

一对一高级客服服务

多端互通

电脑端/手机端权益通用