欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark_Spark foreachRDD 使用示例 以及注意事项

程序员文章站 2022-07-14 12:41:33
...

 

参考文章:

Spark Streaming foreachRDD的正确使用方式

https://blog.csdn.net/q954103/article/details/79439536

 

foreachRDD 主要用于向外部数据存储写入数据,下面给出一个实际的例子。

 

使用示例, 向Redis 存入黑名单。

IPAllImpImpoInfoPairDStream
		.groupByKey().foreachRDD(new VoidFunction<JavaPairRDD<String, Iterable<ImpoInfo>>>() {

	@Override
	public void call(JavaPairRDD<String, Iterable<ImpoInfo>> stringIterableJavaPairRDD) throws Exception {

		stringIterableJavaPairRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Iterable<ImpoInfo>>>>() {

			@Override
			public void call(Iterator<Tuple2<String, Iterable<ImpoInfo>>> tuple2Iterator) throws Exception {

				//TODO : 旧版声明 实例方式
				RedisCluster redisCluster = new RedisCluster(configCenterBoardcast.value().getRedisConfig());
				JedisCluster jedisCluster = redisCluster.getClusterConn();

				//  JedisCluster jedisCluster = RedisClusterMulti.getJedisCluster();

				while (tuple2Iterator.hasNext()) {

					Tuple2<String, Iterable<ImpoInfo>> valueListTuple = tuple2Iterator.next();
					String ipKey = valueListTuple._1();

					//TODO 测试代码
					//System.out.println("remain key : " + ipKey);

					Iterator<ImpoInfo> impoInfoIterator = valueListTuple._2().iterator();

					//时间序列 队列
					List<Long> timeSeqList = new ArrayList<>();
					ImpoInfo tmpInfo = null;

					while (impoInfoIterator.hasNext()) {
						tmpInfo = impoInfoIterator.next();
						timeSeqList.add(tmpInfo.getTime());
					}

					String hashMapName = BlackListCalculateMidPrefix.IP_ALL_IMP_HASH;
					String hashMapInnerKey = ipKey;

					byte[] timeSeqBytes = jedisCluster.hget(hashMapName.getBytes(), hashMapInnerKey.getBytes());
					TimeSeq timeSeq = null;
					if (null != timeSeqBytes) {
						timeSeq = TimeSeqUtil.timeSeqDeSerialize(timeSeqBytes);
					} else {
						timeSeq = new TimeSeq();
					}

					timeSeq.addAll(timeSeqList);
					timeSeq.refresh(DurationEnum.ONE_HOUR);

					//TODO test
					System.out.println("ip all impression middle obj count : \n" + "key : " + ipKey + "\n" + "size : " + timeSeq.getTimeStampSize());

					if (timeSeq.getTimeStampSize() > BlackThreshold.ONE_HOUR_IP_ALL_IMP_THRESHOLD) {
						jedisCluster.set(BlackListPanelPrefix.IP_ALL_IMP + ipKey, "");
						jedisCluster.expire(BlackListPanelPrefix.IP_ALL_IMP + ipKey, 3600 * 2);
					}

					jedisCluster.hset(BlackListCalculateMidPrefix.IP_ALL_IMP_HASH.getBytes(), ipKey.getBytes(), TimeSeqUtil.timeSeqSerialize(timeSeq));

				}

				jedisCluster.close();

				//   我们使用的是redis3.0的集群,用jedis的JedisCluster.close()方法造成的集群连接关闭的情况。
				// jedisCluster内部使用了池化技术,每次使用完毕都会自动释放Jedis因此不需要关闭。
				// 如果调用close方法后再调用jedisCluster的api进行操作时就会出现如上错误。
				//Exception in thread "main" redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException: No reachable node in cluster
				// TODO : Unnecessary
				// jedisCluster.close();
			}
		});
	}
});

 

注意事项:

 

重点:Spark Streaming的foreachRDD运行在Driver端,而foreach和foreachPartion运行在Worker节点。

备注:对数据的向外输出,还是用foreach**算子好,不要用Map**算子,因为Map还要返回一个RDD。

 

 

误区一:在driver上创建连接对象(比如网络连接或数据库连接)

      如果在driver上创建连接对象,然后在RDD的算子函数内使用连接对象,那么就意味着需要将连接对象序列化后从driver传递到worker上。而连接对象(比如Connection对象)通常来说是不支持序列化的,此时通常会报序列化的异常(serialization errors)。因此连接对象必须在worker上创建,不要在driver上创建。
 

dstream.foreachRDD { rdd=>

  val connection = createNewConnection()  // 在driver上执行

  rdd.foreach { record =>

    connection.send(record) // 在worker上执行

  }

}

 

 

误区二:为每一条记录都创建一个连接对象

dstream.foreachRDD { rdd=>

  rdd.foreach { record =>

    val connection = createNewConnection()

    connection.send(record)

    connection.close()

  }

}

 

 

解决方案:


     

 通常来说,连接对象的创建和销毁都是很消耗时间的。因此频繁地创建和销毁连接对象,可能会导致降低spark作业的整体性能和吞吐量。

 

正确做法一:为每个RDD分区创建一个连接对象

dstream.foreachRDD { rdd=>

  rdd.foreachPartition { partitionOfRecords=>

    val connection = createNewConnection()

    partitionOfRecords.foreach(record =>connection.send(record))

    connection.close()

  }

}

 

   比较正确的做法是:对DStream中的RDD,调用foreachPartition,对RDD中每个分区创建一个连接对象,使用一个连接对象将一个分区内的数据都写入底层MySQL中。这样可以大大减少创建的连接对象的数量。
 

 

正确做法二:为每个RDD分区使用一个连接池中的连接对象

 

dstream.foreachRDD { rdd=>

  rdd.foreachPartition { partitionOfRecords=>

    // 静态连接池,同时连接是懒创建的

    val connection = ConnectionPool.getConnection()

    partitionOfRecords.foreach(record =>connection.send(record))

   ConnectionPool.returnConnection(connection)  // 用完以后将连接返回给连接池,进行复用

  }

}