欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spark学习笔记Spark Streaming的使用

程序员文章站 2022-06-07 23:14:09
1. spark streaming spark streaming是一个基于spark core之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理...

1. spark streaming

  • spark streaming是一个基于spark core之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理
  • spark streaing中有一个最基本的抽象叫dstream(代理),本质上就是一系列连续的rdd,dstream其实就是对rdd的封装
  • dstream可以认为是一个rdd的工厂,该dstream里面生产都是相同业务逻辑的rdd,只不过是rdd里面要读取数据的不相同
  • 在一个批次的处理时间间隔里, dstream只产生一个rdd
  • dstream就相当于一个"模板", 我们可以根据这个"模板"来处理一段时间间隔之内产生的这个rdd,以此为依据来构建rdd的dag

2. 当下比较流行的实时计算引擎

吞吐量 编程语言 处理速度 生态

storm 较低 clojure 非常快(亚秒) 阿里(jstorm)

flink 较高 scala 较快(亚秒) 国内使用较少

spark streaming 非常高 scala 快(毫秒) 完善的生态圈

3. spark streaming处理网络数据

//创建streamingcontext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据
val conf = new sparkconf().setappname("ops1").setmaster("local[2]")
val ssc = new streamingcontext(conf, milliseconds(3000))
val receiverds: receiverinputdstream[string] = ssc.sockettextstream("uplooking01", 44444)
val pairretds: dstream[(string, int)] = receiverds.flatmap(_.split(",")).map((_, 1)).reducebykey(_ + _)
pairretds.print()
//开启流计算
ssc.start()
//优雅的关闭
ssc.awaittermination()

4. spark streaming接收数据的两种方式(kafka)

receiver

  • 偏移量是由zookeeper来维护的
  • 使用的是kafka高级的api(消费者的api)
  • 编程简单
  • 效率低(为了保证数据的安全性,会开启wal)
  • kafka0.10的版本中已经彻底弃用receiver了
  • 生产环境一般不会使用这种方式

direct

  • 偏移量是有我们来手动维护
  • 效率高(我们直接把spark streaming接入到kafka的分区中了)
  • 编程比较复杂
  • 生产环境一般使用这种方式

5. spark streaming整合kafka

基于receiver的方式整合kafka(生产环境不建议使用,在0.10中已经移除了)

//创建streamingcontext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据
val conf = new sparkconf().setappname("ops1").setmaster("local[2]")
val ssc = new streamingcontext(conf, milliseconds(3000))
val zkquorum = "uplooking03:2181,uplooking04:2181,uplooking05:2181"
val groupid = "myid"
val topics = map("hadoop" -> 3)
val receiverds: receiverinputdstream[(string, string)] = kafkautils.createstream(ssc, zkquorum, groupid, topics)
receiverds.flatmap(_._2.split(" ")).map((_,1)).reducebykey(_+_).print()
ssc.start()
ssc.awaittermination()

基于direct的方式(生产环境使用)

//创建streamingcontext 至少要有两个线程 一个线程用于接收数据 一个线程用于处理数据
val conf = new sparkconf().setappname("ops1").setmaster("local[2]")
val ssc = new streamingcontext(conf, milliseconds(3000))
val kafkaparams = map("metadata.broker.list" -> "uplooking03:9092,uplooking04:9092,uplooking05:9092")
val topics = set("hadoop")
val inputds: inputdstream[(string, string)] = kafkautils.createdirectstream[string, string, stringdecoder, stringdecoder](ssc, kafkaparams, topics)
inputds.flatmap(_._2.split(" ")).map((_, 1)).reducebykey(_ + _).print()
ssc.start()
ssc.awaittermination()

6. 实时流计算的架构

Spark学习笔记Spark Streaming的使用

1. 生成日志(模拟用户访问web应用的日志)

public class generateaccesslog {
  public static void main(string[] args) throws ioexception, interruptedexception {
    //准备数据
    int[] ips = {123, 18, 123, 112, 181, 16, 172, 183, 190, 191, 196, 120};
    string[] requestypes = {"get", "post"};
    string[] cursors = {"/vip/112", "/vip/113", "/vip/114", "/vip/115", "/vip/116", "/vip/117", "/vip/118", "/vip/119", "/vip/120", "/vip/121", "/free/210", "/free/211", "/free/212", "/free/213", "/free/214", "/company/312", "/company/313", "/company/314", "/company/315"};

    string[] coursenames = {"大数据", "python", "java", "c++", "c", "scala", "android", "spark", "hadoop", "redis"};
    string[] references = {"www.baidu.com/", "www.sougou.com/", "www.sou.com/", "www.google.com"};
    filewriter fw = new filewriter(args[0]);
    printwriter printwriter = new printwriter(fw);
    while (true) {
      //      thread.sleep(1000);
      //产生字段
      string date = new date().tolocalestring();
      string method = requestypes[getrandomnum(0, requestypes.length)];
      string url = "/cursor" + cursors[getrandomnum(0, cursors.length)];
      string httpversion = "http/1.1";
      string ip = ips[getrandomnum(0, ips.length)] + "." + ips[getrandomnum(0, ips.length)] + "." + ips[getrandomnum(0, ips.length)] + "." + ips[getrandomnum(0, ips.length)];
      string reference = references[getrandomnum(0, references.length)];
      string rowlog = date + " " + method + " " + url + " " + httpversion + " " + ip + " " + reference;
      printwriter.println(rowlog);
      printwriter.flush();
    }
  }


  //[start,end)
  public static int getrandomnum(int start, int end) {
    int i = new random().nextint(end - start) + start;
    return i;
  }
}

2. flume使用avro采集web应用服务器的日志数据

采集命令执行的结果到avro中

# the configuration file needs to define the sources, 
# the channels and the sinks.
# sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f1.sources = r1
f1.channels = c1
f1.sinks = k1

#define sources
f1.sources.r1.type = exec
f1.sources.r1.command =tail -f /logs/access.log

#define channels
f1.channels.c1.type = memory
f1.channels.c1.capacity = 1000
f1.channels.c1.transactioncapacity = 100

#define sink 采集日志到uplooking03
f1.sinks.k1.type = avro
f1.sinks.k1.hostname = uplooking03
f1.sinks.k1.port = 44444

#bind sources and sink to channel 
f1.sources.r1.channels = c1
f1.sinks.k1.channel = c1
从avro采集到控制台
# the configuration file needs to define the sources, 
# the channels and the sinks.
# sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f2.sources = r2
f2.channels = c2
f2.sinks = k2

#define sources
f2.sources.r2.type = avro
f2.sources.r2.bind = uplooking03
f2.sources.r2.port = 44444

#define channels
f2.channels.c2.type = memory
f2.channels.c2.capacity = 1000
f2.channels.c2.transactioncapacity = 100

#define sink
f2.sinks.k2.type = logger

#bind sources and sink to channel 
f2.sources.r2.channels = c2
f2.sinks.k2.channel = c2
从avro采集到kafka中
# the configuration file needs to define the sources, 
# the channels and the sinks.
# sources, channels and sinks are defined per agent, 
# in this case called 'agent'
f2.sources = r2
f2.channels = c2
f2.sinks = k2

#define sources
f2.sources.r2.type = avro
f2.sources.r2.bind = uplooking03
f2.sources.r2.port = 44444

#define channels
f2.channels.c2.type = memory
f2.channels.c2.capacity = 1000
f2.channels.c2.transactioncapacity = 100

#define sink
f2.sinks.k2.type = org.apache.flume.sink.kafka.kafkasink
f2.sinks.k2.topic = hadoop
f2.sinks.k2.brokerlist = uplooking03:9092,uplooking04:9092,uplooking05:9092
f2.sinks.k2.requiredacks = 1

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。