欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink入门(二)(使用kafka作为sink和source)

程序员文章站 2022-07-14 14:16:21
...

在Mac电脑上安装使用kafka

使用kafka需要先安装zookeeper作为注册中心,在Mac上可以先安装homebrew然后再使用homebrew作为工具安装kafka和zookeeper

brew install kafka
brew install zookeeper

进入 /usr/local/Cellar下分别启动kafka和zookeeper
启动命令为
zookeeper命令为

zkServer start

kafka的各种命令为

Kafka 服务端启动启动
kafka-server-start /usr/local/etc/kafka/server.properties

kafka生产者启动
kafka-console-producer --broker-list localhost:9092 --topic first

kafka消费者启动
kafka-console-consumer --bootstrap-server localhost:9092 --topic pktest

kafka创建topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pktest

kafka展示topic列表
kafka-topics --list --zookeeper localhost:2181

Scala 使用Kafka作为Source

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object StreamingJob {
  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._

    var topic ="pktest"

    var  properties = new Properties()
    properties.setProperty("bootstrap.servers","127.0.0.1:9092")
    properties.setProperty("group.id","test")

    val  data = env.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),properties))
    data.print()
    env.execute("StreamingJob")
  }
}

上述即为Kafka作为消费者的例子,我们在启动producer的窗口中进行输入,即可在控制台中看到输出结果

使用Kafka作为生产者

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}

object KafkaConnectorProducer {
  def main(args: Array[String]): Unit = {


    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._


    var data = env.socketTextStream("localhost",9999)

    var topic ="pktest"

    var  properties = new Properties()
    properties.setProperty("bootstrap.servers","127.0.0.1:9092")
    //properties.setProperty("group.id","test")

    //env.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),properties))
    var kafkaSink = new FlinkKafkaProducer[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),properties)
    data.addSink(kafkaSink)
    env.execute("KafkaConnectorProducer")
  }

}

此例子使用nc -lk 9000 作为数据源。经过Flink包装后传输到对应的Topic在 nc -lk 9000的命令行窗口输入一些字符并回车。即可在对应的kafka消费者命令窗口看到消费的数据