您所在位置: 网站首页 / 文档列表 / 行业软件 / 文档详情
Spark Streaming作业提交源码分析数据处理篇.pdf 立即下载
上传人:新槐****公主 上传时间:2024-09-04 格式:PDF 页数:2 大小: 金币:10 举报 版权申诉
预览加载中,请您耐心等待几秒...

Spark Streaming作业提交源码分析数据处理篇.pdf

SparkStreaming作业提交源码分析数据处理篇.pdf

预览

在线预览结束,喜欢就下载吧,查找使用更方便

10 金币

下载文档

如果您无法下载资料,请参考说明:

1、部分资料下载需要金币,请确保您的账户上有足够的金币

2、已购买过的文档,再次下载不重复扣费

3、资料包下载后请先用软件解压,在使用对应软件打开

SparkStreaming作业提交源码分析数据处理篇在调用StreamingContext的start函数的时候,会调用JobScheduler的start函数。而JobScheduler的start函数会启动ReceiverTracker和jobGenerator。在启动jobGenerator的时候,系统会根据这次是从Checkpoint恢复与否分别调用restart和startFirstTime函数。/**Startgenerationofjobs*/defstart():Unit=synchronized{if(eventActor!=null)return//generatorhasalreadybeenstartedeventActor=ssc.env.actorSystem.actorOf(Props(newActor{defreceive={caseevent:JobGeneratorEvent=>processEvent(event)}}),"JobGenerator")if(ssc.isCheckpointPresent){restart()}else{startFirstTime()}}}startFirstTime函数会分别启动DStreamGraph和JobGenerator线程privatedefstartFirstTime(){valstartTime=newTime(timer.getStartTime())graph.start(startTime-graph.batchDuration)timer.start(startTime.milliseconds)logInfo("StartedJobGeneratorat"+startTime)}privatevaltimer=newRecurringTimer(clock,ssc.graph.batchDuration.milliseconds,longTime=>eventActor!GenerateJobs(newTime(longTime)),"JobGenerator")JobGenerator线程会每隔ssc.graph.batchDuration.milliseconds的时间生成Jobs,这个时间就是我们初始化StreamingContext的时候传进来的,生成Jobs是通过Akka调用generateJobs方法:*Generatejobsandperformcheckpointforthegiven`time`.*/privatedefgenerateJobs(time:Time){//SetthesparkEnvinthisthread,sothatjobgenerationcodecanaccessthe//environmentExample:BlockRDDsarecreatedinthisthread,anditneeds//toaccessBlockManager//Update:ThisisprobablyredundantafterthreadlocalstuffinsparkEnvhas//beenremoved.SparkEnv.set(ssc.env)Try{//allocatereceivedblockstobatchjobScheduler.receiverTracker.allocateBlocksToBatch(time)graph.generateJobs(time)//generatejobsusingallocatedblock}match{caseSuccess(jobs)=>valreceivedBlockInfos=jobScheduler.receiverTracker.getBlocksOfBatch(time).mapValues{_.toArray}jobScheduler.submitJobSet(JobSet(time,jobs,receivedBlockInfos))caseFailure(e)=>jobScheduler.reportError("Errorgeneratingjobsfortime"+time,e)}eventActor!DoCheckpoint(time)}在generateJobs方法中的jobScheduler.receiverTracker.allocateBlocksToBatch(time)很重要,其最终调用的是allocateBlocksToBatch函数,其定义如下:defallocateBlocksTo
单篇购买
VIP会员(1亿+VIP文档免费下)

扫码即表示接受《下载须知》

Spark Streaming作业提交源码分析数据处理篇

文档大小:

限时特价:扫码查看

• 请登录后再进行扫码购买
• 使用微信/支付宝扫码注册及付费下载,详阅 用户协议 隐私政策
• 如已在其他页面进行付款,请刷新当前页面重试
• 付费购买成功后,此文档可永久免费下载
年会员
99.0
¥199.0

6亿VIP文档任选,共次下载特权。

已优惠

微信/支付宝扫码完成支付,可开具发票

VIP尽享专属权益

VIP文档免费下载

赠送VIP文档免费下载次数

阅读免打扰

去除文档详情页间广告

专属身份标识

尊贵的VIP专属身份标识

高级客服

一对一高级客服服务

多端互通

电脑端/手机端权益通用

手机号注册 用户名注册
我已阅读并接受《用户协议》《隐私政策》
已有账号?立即登录
我已阅读并接受《用户协议》《隐私政策》
已有账号?立即登录
登录
手机号登录 微信扫码登录
微信扫一扫登录 账号密码登录

首次登录需关注“豆柴文库”公众号

新用户注册
VIP会员(1亿+VIP文档免费下)
年会员
99.0
¥199.0

6亿VIP文档任选,共次下载特权。

已优惠

微信/支付宝扫码完成支付,可开具发票

VIP尽享专属权益

VIP文档免费下载

赠送VIP文档免费下载次数

阅读免打扰

去除文档详情页间广告

专属身份标识

尊贵的VIP专属身份标识

高级客服

一对一高级客服服务

多端互通

电脑端/手机端权益通用