欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Storm中文官方文档翻译计划(1) ——从入门到精通

程序员文章站 2022-07-13 17:07:24
...
Storm中文官方文档翻译计划(1)
——从入门到精通


    Storm是一个分布式实时计算系统。就像Hadoop提供一组通用原语来进行批量处理(batch processing)一样,Storm也提供了一组通用原语来进行实时计算(realtime computation)。Storm非常简单,能用于任意编程语言,被很多大的公司采用,并且使用过程中乐趣多多。
    本教程中,你会学习如何创建Storm拓扑(topologies),以及如何部署它们到Storm集群中。Java是我们使用的主要语言,但是一些例子会使用到Python来展示Storm的多语言能力。

    Storm集群组件(Components of a Storm cluster)
    Storm集群看起来类似于Hadoop集群。然而,在Hadoop上你运行的是”MapReduce jobs”,在Storm上你运行的是” topologies”。”Jobs”和”topologies”本身就非常的不同,其中一个主要的一个区别是,MapReduce的任务最终会结束,而拓扑则永远在处理消息(直到你干掉它)。
    torm集群上有两种类型的节点:主节点(master node)和工作节点(worker node)。主节点运行一个称为Nimbus的守护进程,类似于Hadoop的JobTracker。Nimbus负责分发代码到集群中,分配任务到机器,并且监控失败。
    每一个工作节点运行一个称为Supervisor的守护进程。supervisor监听分配给这台机器的工作,并且基于Nimbus分配情况在必要时启动和停止工作进程。每一个工作进程执行一个拓扑的子集合;一个运行中的拓扑由横跨多个机器的多个工作进程组成。
Storm中文官方文档翻译计划(1) ——从入门到精通
            
    
    博客分类: 大数据 storm大数据腾讯实时计算Hadoop
    Numbus和Supervisor的所有协调同步工作是由Zookeeper集群完成的。另外,Nimbus和Supervisor守护进程是快速失败和无状态的,所有的状态保存在Zookeeper或者本地磁盘中。这意味着,你可以kill -9Nimbus或者Supervisor,它们会像什么都没有发生一样再启动起来。这个设计使得Storm集群变得难以置信的稳定。

    拓扑(Topologies)
    为了在Storm上进行实时计算,你需要创建一个我们称为拓扑的东西。一个拓扑就是一个计算图谱。拓扑中的每一个节点包含着处理逻辑,以及表明数据如何在节点间传递的很多链接。
    运行一个拓扑非常的简单。首先,你将你所有的代码和依赖打包进一个jar中。然后,运行类似下面这样的命令行:
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

    上面使用参数arg1和arg2来运行backtype.storm.MyTopology类。这个类的主要功能是定义了拓扑,并且将其提交给Nimbus。storm jar部分负责连接到Nimbus并且上传这个jar文件。
    因为拓扑的定义就是Thrift结构体,Nimbus是一个Thrift服务,所以你可以创建和提交然一编程语言的拓扑。上面的例子是基于JVM语言实现的最简单方式。要想了解如何在生产环境集群中运行拓扑,持续关注本系列后续文章(译者注)。

    流(Streams)
    Storm中的核心抽象概念就是流。流是无边界的元组(tuples)的序列。Storm提供以分布式的、可靠的方式转换一个流到一个新流的原语。例如,你可以转换一个微博的流到一个流行话题的流。
    Storm提供处理流转换的基本原语是spouts和bolts。,实现spout和bolt提供的接口来运行你应用程序相关的逻辑。
    spout是流的源头。例如,一个spout从Kestrel队列读取元组,然后作为流发射(emit)它们。一个spout也可以连接到Twitter API,然后发射一个微博的流。
    一个bolt消费任意数量的输入流,进行一些处理,然后可能发射新的流。复杂的流转换,例如从微博流中计算流行话题的流,需要更多的步骤以及更多的bolt。bolt可以干任意事情,从运行函数,过滤元组,聚合(aggregations)流,连接(joins)流,到与数据库交互,等等。
    spout和bolt组成的网络被打包进一个拓扑中,这就是你提交给Storm集群执行的最高层次抽象。拓扑就是一个流的转换图谱,每一个节点就是一个spout或者bolt。图中的边表示哪个流被哪个流订阅。当一个spout或者bolt发射一个元组到一个流,也会发送这个元组到每一个订阅这个流的bolt。
Storm中文官方文档翻译计划(1) ——从入门到精通
            
    
    博客分类: 大数据 storm大数据腾讯实时计算Hadoop
    拓扑中节点间的链接表示元组该如何被传递。例如,Spout A和Spout B之间有一个链接,Spout A到Bolt C有一个链接,Bolt B到Bolt C有一个链接,那么每次Spout A发射一个元组,也会发送这个元组到Bolt B和Bolt C。Bolt B的所有输出元组也会到达Bolt C。
    Storm拓扑中的每个节点是并行执行的。在你的拓扑中,你可以为每个节点指定你想要的并行度(parallelism),然后Storm会在集群中分配这么多数量的线程来执行。
    拓扑永远在运行中,除非你干掉它。Storm会自动重新分配失败的任务。另外,Storm保证不会有数据丢失,及时机器挂掉了以及消息丢失了。

    数据模型(Data model)
    Storm使用元组作为数据模型。元组是命名的值列表,元组中的字段可以是任何类型的对象。Storm支持所有基本类型,字符串和梓杰数组作为元组的字段值。要使用一个其他类型的对象,你只需要为该类型实现一个序列化器即可。
    拓扑中的每一个节点必须声明它发射的元组的输出字段。例如,下面的bolt声明其发射double和triple字段的一个二元组。
public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);        
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    
}

    declareOutputFields函数为组件声明了输出字段["double", "triple"]。这个bolt的剩下部分会在下面的小节详细解释。

    一个简单的拓扑(A simple topology)
    让我们来看一个简单的拓扑来探索更多的概念,然后看看这些代码是怎么来的。我们看看storm-starter中的ExclamationTopology定义:
TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);        
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");

    这个拓扑包含1个spout和2个bolt。spout发射单词,每个bolt追加字符串”!!!”到输入。节点被线性安排:spout发射到第一个bolt,然后第一个bolt发射给第二个bolt。如果spout发射元组[“bob”]和[“john”],之后第二个bolt会发射单词[“bob!!!!!!”]和[“john!!!!!!”]。
    这段代码使用setSpout和setBolt方法来定义节点。这个方法一个用户相关的id,一个包含处理逻辑的对象,以及这个节点你想要的并行度作为输入。在这个例子中,spout被赋予的id为”words”,bolt被赋予的id为”exclaim1”和”exclaim2”。
    包含处理逻辑的对象实现了IrichSpout接口作为spout,实现了IrichBolt接口作为bolt。
最后一个参数,你期待的节点并行度,是可选的。它表明集群中该使用多少个线程执行这个组件。如果你忽略它,Storm只会为这个节点分配一个线性。
    setBolt返回一个InputDeclarer对象,用于定义到bolt的输入。这里,组件exclaim1通过shuffle grouping声明它想要读取组件words发射的所有元组,组件exclaim2通过shuffle grouping声明它想读取组件exclaim1发射的所有元组。shuffle grouping意思是元组应该从输入任务被随机的分布到bolt任务。有多种在组件间分组数据的方式。这些将会在下面的小节中详细解释。
    如果你想让组件exclaim2读取所有由组件words和组件exclaim1发射的元组,你需要像下面这样定义组件exclaim2:
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
            .shuffleGrouping("words")
            .shuffleGrouping("exclaim1");

    如你所见,输入声明可以是链式的以为bolt指定多个来源。
    让我们再深入研究一下这个拓扑中的spout和bolt实现。spout负责发射新消息到拓扑中。这个拓扑中,TestWordSpout每隔100ms从 ["nathan", "mike", "jackson", "golda", "bertels"]中发射一个随机单词一元组。TestWordSpout 中的nextTuple()实现看起来像这样:
public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

    你可以看到,实现非常的简单。
    ExclamationBolt追加字符串”!!!”到其输入。让我们看一下ExclamationBolt的完整实现:
public static class ExclamationBolt implements IRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    public void cleanup() {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    public Map getComponentConfiguration() {
        return null;
    }
}

    prepare方法提供一个OutputCollector给bolt,用于bolt发射元组。元组可以在任意时刻从bolt被发射——在prepare,execute或者cleanup方法中,或者甚至在另一个线程中异步发射。这个prepare实现只是保存OutputCollector作为一个实例变量,以在后面的execute方法中使用。
    execute方法接受来自某个bolt输入中的元组。ExclamationBolt从一个元组中抓取第一个字段,然后发射一个被追加”!!!”的新的元组。如果你实现一个订阅多个输入源的bolt,你可以通过使用Tuple#getSourceComponent方法来分辨元组来自于哪个组件。
    execute方法中还有一些其他的信息,也就是输入元组作为第一个参数传递给emit,并且输入元组在最后一行被确认(acked),这些是属于Storm reliability API的部分,以保证数据不会被丢失,后文中会详细解释。
    cleanup方法当Bolt被关闭时被调用,用于清理我们打开的任意资源。集群中这个方法是否被调用没有保证:例如,如果运行任务的机器着火了,就没有办法调用这个方法。cleanup方法用来当你在本地模式下运行拓扑时(在进程中模拟Storm集群),你想能够运行和干掉许多拓扑而免遭任何资源泄露。
    declareOutputFields方法声明ExclamationBolt发射只有一个word字段的一元组。
getComponentConfiguration方法允许你配置这个组件如何运行的方方面面。这是一个更高级的主题,我们会在本系列后续文章中深入解释(译者注)。
    cleanup和getComponentConfiguration方法通常不需要实现。你可以使用一个基类更方便的定义bolt,基类提供了默认的合适的实现。ExclamationBolt可以通过继承BaseRichBolt更方便的实现,像这样:
public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }    
}


    在本地模式下运行ExclamationTopology (Running ExclamationTopology in local mode)
    让我们看看如何在本地模式下运行ExclamationTopology,以及它是怎么运转的。
    Storm有两种运行模式:本地模式和分布式模式。在本地模式下,Storm完全在用线程模拟工作节点的进程中运行。本地模式在测试和开发拓扑时非常有用。当你运行storm-starter中的拓扑时,它们就运行在本地模式,你可以看到每个组件发射了什么消息。你可以在另一篇中看到在本地模式运行拓扑的更详细信息(译者注)。
    在分布式模式下,Storm像一个机器集群一样运行。当你提交一个拓扑到主节点,你也提交了所有的必要代码来运行这个拓扑。主节点负责发布你的代码并且分配工作节点来运行你的拓扑。如果工作节点挂掉了,主节点会重新分配它们到其他地方。你可以在另一篇中看到在生产集群中运行拓扑的更详细信息(译者注)。
    下面是在本地模式运行ExclamationTopology的代码:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

    最开始,代码通过创建LocalCluster对象来定义一个进程内集群。提交一个拓扑到这个虚拟集群等同于提交拓扑到分布式集群。通过调用submitTopology来提交一个拓扑到LocalCluster,以一个拓扑名字,一个拓扑配置,以及拓扑本身作为参数。
    名字用于识别这个拓扑,这样你可以在稍后干掉它。一个拓扑会无限的运行下去直到你干掉它。
    配置用于跳着拓扑运行的方方面面。下面的两个配置非常常用:
    1 TOPOLOGY_WORKERS (使用setNumWorkers设置)指定你想在集群中分配多少个进程来运行这个拓扑。拓扑中的每一个组件都会作为许多线程运行。某个组件被分配的线程数通过setBolt和setSpout方法来配置。这些线程存在于工作进程中。每一个工作进程包含若干组件的若干线程在其中。例如,你在你所有的组件中指定了300个线程,在配置中丁丁了50个工作进程。每个工作进程将会执行6个线程,每一个属于一个不同的组件。你通过调整每个组件的并行性以及运行这些线程的工作者进程的数量来调优Storm拓扑的性能。
    2 TOPOLOGY_DEBUG (使用setDebug设置)当设置为true时,告诉Storm记录每个组件发射的每个消息。这在本地模式下测试拓扑时非常有用,但是你很可能想要在集群中运行拓扑时保持这个关闭。
还有很多其他的配置你可以用于设置拓扑。更多细节请参考Config的Javadoc。
要学习如何设置开发环境,以让你能够在本地模式下运行拓扑(例如在Eclipse中),请参考本系列下一篇(译者注)。

    流分组(Stream groupings)
    流分组告诉拓扑如何在组件间发送元组。记住,spout和bolt以很多任务的形式在集群中并行执行。如果你从任务级别来观察拓扑如何执行,看起来就像是这样:
Storm中文官方文档翻译计划(1) ——从入门到精通
            
    
    博客分类: 大数据 storm大数据腾讯实时计算Hadoop
    当Bolt A的一个任务发射一个元组给Bolt B,这个元组会被发送给哪个任务?
    流分组通过告诉Storm如何在任务集合间发送元组来回答这个问题。在我们深入到不同的流分组类型之前,让我们一起看看storm-starter中的另一个拓扑。WordCountTopology中的spout读取句子,然后WordCountBolt输出每个单词之前出现的次数:
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("sentences", new RandomSentenceSpout(), 5);        
builder.setBolt("split", new SplitSentence(), 8)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
        .fieldsGrouping("split", new Fields("word"));

    SplitSentence发射每个它收到的句子的每个单词的元组,WordCount在内存中保存一个从单词到技术的映射。每次WordCount收到一个单词,就更新它的状态,然后发射一个新的单词计数。
    有一些不同的流分组类型:
    最简单的分组类型被称为” shuffle grouping”,发送元组给随机的任务。   WordCountTopology使用一个shuffle grouping来从RandomSentenceSpout向SplitSentence bolt发送任务。结果就是,处理元组的工作被平均分配在SpliteSentence bolt的所有任务中。
    更有意思的一种分组类型是”fields grouping”. fields grouping被用于SplitSentence bolt和WordCount bolt之间。WordCount bolt中一个很关键的功能是想通的单子总是去往同一个任务。否则,多余一个的任务会看到相同的单词,它们会分别发射一个错误的值来计数,因为每一个都只有不完整的信息。fields grouping允许你通过字段的子集来分组一个流。这导致字段子集中相等的值去往同一个任务。因为WordCount使用fields grouping在word字段上订阅了SplitSentence的输出流,相同的单词总是去往相同的任务,这样bolt就产生了正确的输出。
    fields grouping是实现流连接和流聚合的基础,也适用于很多其他情形。进一步说,fields grouping是通过使用取模哈希实现的。
    还有其他的几种流分组类型。请参考本系列的后续文章(译者注)。

    使用其他语言定义Bolt(Defining Bolts in other languages)
    可以使用任何语言来定义bolt。使用其他语言编写的bolt作为子进程来执行,Storm使用标准输入输出上的JSON消息来与这些子进程通信。通信协议只需要大约100行的一些适配库,Storm自带了Ruby,Python和Fancy的适配库。
    下面是WordCountTopology中的SplitSentence的定义:
public static class SplitSentence extends ShellBolt implements IRichBolt {
    public SplitSentence() {
        super("python", "splitsentence.py");
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

    SplitSentence重写了ShellBolt,并且声明以splitsentence.py为参数使用python来运行。下面是splitsentence.py的实现:
import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

    使用其他语言编写spout和bolt的更多信息,以及如何使用其他语言创建拓扑(完全避免JVM),请参看本系列后续文章(译者注)。

    保证消息处理(Guaranteeing message processing)
    在这个教程的早期版本中,我们跳过了元组如何被发射的一些方面。这些方面属于Storm reliability API:Storm如何保证每一个来自spout的消息被完全的处理。保证消息处理是如何运转的,以及你可以怎么做来获得Storm可靠性能力的相关信息,请关注本系列后续文章(译者注)。

    事务性拓扑(Transactional topologies)
    Storm保证每一个消息将会在拓扑中被处理至少一次。常见的一个问题是:你怎么在Storm里面计数?你不会重复计数吗?Storm有一个被称为事务性拓扑的特性允许你在大多数计算场景下实现精确一次消息送达的语义。要想了解更多关于事务性拓扑的信息,请关注本系列后续文章(译者注)。

    分布式RPC(Distributed RPC)
    本教程展示了如何在Storm上进行基本的流处理。你可以使用Storm提供的原语做更多的事情。其中Storm最有意思的应用之一就是分布式RPC,你可以直接并行化功能的计算。要想了解更多关于分布式RPC的信息,请关注本系列后续文章(译者注)。

    总结(Conclusion)
    本教程提供了一个开发、测试和部署Storm拓扑的概览视图。后面的文章将深入到使用Storm的各个方面。
===============================================================================
    大家好,我是阮威。华中科技大学,计算机软件专业硕士。毕业后加入腾讯,先后在腾讯电子商务部和无线游戏产品部工作,现供职于欢聚时代负责基础产品相关工作。IT男,至今。欢迎大家收听我的公众账号"技术与人生"。
Storm中文官方文档翻译计划(1) ——从入门到精通
            
    
    博客分类: 大数据 storm大数据腾讯实时计算Hadoop