欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖析

程序员文章站 2022-06-25 10:01:07
简介通过 pulsar-flink-connector 读取到 apache pulsar 中的namespaces、topics的元数据信息。pulsar-flink-connector 的 git...

简介

通过 pulsar-flink-connector 读取到 apache pulsar 中的namespaces、topics的元数据信息。
pulsar-flink-connector 的 github:

maven

 <dependency>
   <groupid>io.streamnative.connectors</groupid>
   <artifactid>pulsar-flink-connector-2.11-1.12</artifactid>
   <version>2.7.3</version>
 </dependency>

   <!-- jar repositories -->
   <repositories>
        <repository>
            <id>central</id>
            <layout>default</layout>
            <url>https://repo1.maven.org/maven2</url>
        </repository>
        <repository>
            <id>bintray-streamnative-maven</id>
            <name>bintray</name>
            <url>https://dl.bintray.com/streamnative/maven</url>
        </repository>
    </repositories>

code

使用pulsarmetadatareader获取元数据

package com.levi.demo;

import org.apache.flink.streaming.connectors.pulsar.internal.pulsarmetadatareader;
import org.apache.pulsar.client.admin.pulsaradminexception;
import org.apache.pulsar.client.impl.auth.authenticationtoken;
import org.apache.pulsar.client.impl.conf.clientconfigurationdata;
import org.apache.pulsar.common.schema.schemainfo;
import org.apache.pulsar.common.schema.schematype;

import java.io.ioexception;
import java.util.hashmap;
import java.util.list;
import java.util.map;

/**
 * test.
 *
 * @author levi
 * @version 1.0
 **/
public class test {

    public static void main(string[] args)  {
        final clientconfigurationdata configurationdata = new clientconfigurationdata();
        configurationdata.setserviceurl("pulsar://127.0.0.1:6650");
        //your pulsar token
        final authenticationtoken token =
                new authenticationtoken(
                        "eyjxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx"); 
        configurationdata.setauthentication(token);
 
        try (final pulsarmetadatareader reader =
                     new pulsarmetadatareader("http://127.0.0.1:8443",
                             configurationdata,
                             "",
                             new hashmap(),
                             -1,
                             -1)) {
            //获取namespaces
            final list<string> namespaces = reader.listnamespaces();
            system.out.println("namespaces: " + namespaces.tostring());
            
            for (final string namespace : namespaces) {
                //获取topics
                final list<string> topics = reader.gettopics(namespace);
                system.out.println("topic: " + topics.tostring());
                
                for (string topic : topics) {
                    //获取字段schemainfo
                    final schemainfo schemainfo = reader.getpulsarschema(topic);
                    final string name = schemainfo.getname();
                    system.out.println("schemaname:" + name); //topicname
                    final schematype type = schemainfo.gettype(); 
                    system.out.println("schematype:" + type.tostring());// "json"...
                    final map<string, string> properties = schemainfo.getproperties();
                    system.out.println(properties); 
                    final string schemadefinition = schemainfo.getschemadefinition();
                    system.out.println(schemadefinition); // field info.
                }
            }

        } catch (ioexception | pulsaradminexception e) {
            e.printstacktrace();
        }


    }


}

到此这篇关于java使用pulsar-flink-connector读取pulsar catalog元数据的文章就介绍到这了,更多相关java读取pulsar catalog元数据内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!