欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

org.apache.spark.SparkException: Task not serializable

程序员文章站 2022-07-15 12:55:36
...

报错详情

org.apache.spark.SparkException: Task not serializable  
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:925)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
	at exam.Five$$anonfun$3.apply(Five.scala:66)
	at exam.Five$$anonfun$3.apply(Five.scala:42)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:256)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:256)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:256)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:255)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: redis.clients.jedis.JedisPool
Serialization stack:
	- object not serializable (class: redis.clients.jedis.JedisPool, value: [email protected])
	- field (class: exam.Five$$anonfun$3$$anonfun$apply$2, name: cs$1, type: class redis.clients.jedis.JedisPool)
	- object (class exam.Five$$anonfun$3$$anonfun$apply$2, <function1>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
	... 30 more

原因

出现“task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。特别是当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。

错误代码

//   从kafka中的消息
val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )
  //  处理kafka的消息
  stream.foreachRDD(rdd=>{
    if(!rdd.isEmpty()){
      val lin = rdd.map(_.value())
//      lin.foreach(println(_))
      val res = lin.map(x=>{
        val l = x.split(" ")
        (l(0),l(2),l(4).trim.toInt)
      })
      //                        *******************redis连接池   
      val rcon = new JedisPoolConfig()
      rcon.setMaxIdle(3)
      val cs = new JedisPool(rcon,"localhost",6379,1000)
      //  (5)	求出实时成交额保存到redis(5分)
      cs.getResource.select(6)
      val res2 =res.map(_._3).reduce(_+_)
      println(res2)
      cs.getResource.incrBy("实时成交额",res2)
      //  (6)	求出实时购买人数保存到redis(5分)
      val res3=res.map(_._1).distinct().count().toInt
      println(res3)
      cs.getResource.incrBy("购买人数",res3)
      //  (7)	求出实时分类成交额保存到redis中(5分)
      val res4 = res.map(xx=>(xx._2,xx._3)).reduceByKey(_+_)
      res4.foreach(println(_))
      res4.foreachPartition(c=>{
         c.foreach(v=>{
          //*******************  因为redis连接池是外部定义的,所以在这个for循环里不能用需要重新定义
          cs.getResource.incrBy(v._1,v._2)   
        })
      })
      cs.getResource.close()
      cs.close()
    }
  })

解决方法

每次往redis中存的时候,都需要重新获取 连接池

//  (7)	求出实时分类成交额保存到redis中(5分)
      val res4 = res.map(xx=>(xx._2,xx._3)).reduceByKey(_+_)
      res4.foreach(println(_))
      res4.foreachPartition(c=>{
        val rconn = new JedisPoolConfig()   //***********重新获取redis连接池
        rconn.setMaxIdle(3)
        val css = new JedisPool(rconn,"localhost",6379,1000)
        c.foreach(v=>{
          cs.getResource.incrBy(v._1,v._2)

        })
       css.getResource.close()
        css.close()
      })
相关标签: # Redis