博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Big Data 每日一题20180922】sparkstreaming同时消费多个topic的数据实现exactly-once的语义
阅读量:4216 次
发布时间:2019-05-26

本文共 4595 字,大约阅读时间需要 15 分钟。

最近很多人问我,sparkstreaming怎么消费多个topic的数据,自己维护offest,其实这个跟消费一个topic是一样的,但还是有很多问我,今天就简单的写一个demo,供大家参考,直接上代码吧,已经测试过了.我把offest存到redis里了,当然也可以保存在zk,kafka,mysql,hbase中都可以,看自己的选择.(用了3个topic,每个topic5个partition.)

package spark import java.io.Fileimport kafka.{PropertiesScalaUtils, RedisKeysListUtils}import kafka.streamingRedisHive.{dbIndex}import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.log4j.{Level, Logger}import org.apache.spark.TaskContextimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.kafka010._import redis.RedisPool object moreTopic {  def main(args: Array[String]): Unit = {    Logger.getLogger("org.apache.spark").setLevel(Level.INFO)    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO)    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO)    val warehouseLocation = new File("hdfs://cluster/hive/warehouse").getAbsolutePath    val spark = SparkSession.builder().appName("Spark Jason").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()    spark.conf.set("spark.streaming.concurrentJobs", 10)    spark.conf.set("spark.streaming.kafka.maxRetries", 50)    spark.conf.set("spark.streaming.stopGracefullyOnShutdown",true)    spark.conf.set("spark.streaming.backpressure.enabled",true)    spark.conf.set("spark.streaming.backpressure.initialRate",5000)    spark.conf.set("spark.streaming.kafka.maxRatePerPartition", 3000)    @transient    val sc = spark.sparkContext    val scc = new StreamingContext(sc, Seconds(2))    val kafkaParams = Map[String, Object](      "auto.offset.reset" -> "latest",      "value.deserializer" -> classOf[StringDeserializer]      , "key.deserializer" -> classOf[StringDeserializer]      , "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker")      , "group.id" -> PropertiesScalaUtils.loadProperties("groupId")      , "enable.auto.commit" -> (false: java.lang.Boolean)    )    var stream: InputDStream[ConsumerRecord[String, String]] = null    val topics = Array("jason_20180519", "jason_0606","jason_test")    val maxTotal = 200    val maxIdle = 100    val minIdle = 10    val testOnBorrow = false    val testOnReturn = false    val maxWaitMillis = 5000    RedisPool.makePool(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, maxWaitMillis)    val jedis = RedisPool.getPool.getResource    jedis.select(dbIndex)    val keys = jedis.keys(topics(0) + "*")    val keys_2 = jedis.keys(topics(1) +"*")    val keys_3 = jedis.keys(topics(2) +"*")    if(keys.size() == 0 && keys_2.size() == 0 && keys_3.size() == 0){      println("第一次启动,从头开始消费数据-----------------------------------------------------------")      stream = KafkaUtils.createDirectStream[String, String](        scc,        LocationStrategies.PreferConsistent,        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)      )    }else{      println("不是第一次启动,从上次的offest开始消费数据-----------------------------------------------")      stream = KafkaUtils.createDirectStream[String, String](        scc,        LocationStrategies.PreferConsistent,        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, RedisKeysListUtils.getRedisOffest(topics,jedis)))    }    jedis.close()    stream.foreachRDD(rdd=>{      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges      rdd.foreachPartition(partition=>{        val o = offsetRanges(TaskContext.get.partitionId)        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")        val jedis_jason = RedisPool.getPool.getResource        jedis_jason.select(dbIndex)        partition.foreach(pair=>{          //自己的计算逻辑;        })        offsetRanges.foreach { offsetRange =>          println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)          val topic_partition_key_new = offsetRange.topic + "_" + offsetRange.partition          jedis_jason.set(topic_partition_key_new, offsetRange.untilOffset + "")        }        jedis_jason.close()      })    })    scc.start()    scc.awaitTermination()  }}

转:https://blog.csdn.net/xianpanjia4616/article/details/81709075 --------------------- 本文来自 JasonLee_coding 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/xianpanjia4616/article/details/81709075?utm_source=copy

你可能感兴趣的文章
编写苹果游戏中心应用程序(翻译 1.3 为iOS应用程序设置游戏中心)
查看>>
编写苹果游戏中心应用程序(翻译 1.4 添加游戏工具包框架)
查看>>
编写苹果游戏中心应用程序(翻译 1.5 在游戏中心验证本地玩家)
查看>>
编写苹果游戏中心应用程序(翻译 1.6 获取本地玩家的信息)
查看>>
编写苹果游戏中心应用程序(翻译 1.7 在游戏中心添加朋友)
查看>>
编写苹果游戏中心应用程序(翻译 1.8 获取本地玩家的好友信息)
查看>>
WebGL自学教程《OpenGL ES 2.0编程指南》翻译——勘误表
查看>>
WebGL自学教程——WebGL示例:12. 要有光
查看>>
WebGL自学教程——WebGL示例:13.0 代码整理
查看>>
WebGL自学教程——WebGL示例:14.0 代码整理
查看>>
恶心的社会
查看>>
中国式危机公关9加1策略(第五章 慎用信息控制策略)
查看>>
展现自己的人生智慧
查看>>
android 电池(一):锂电池基本原理篇
查看>>
android 电池(二):android关机充电流程、充电画面显示
查看>>
android 电池(三):android电池系统
查看>>
android电池(四):电池 电量计(MAX17040)驱动分析篇
查看>>
ubuntu清除svn保存的username用户名和paasword密码
查看>>
如何过滤 adb logcat 输出
查看>>
ERROR 常数
查看>>