storm 分布式实时计算系统介绍

Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理大数据一样,Storm可以实时处理数据。Storm简单,可以使用任何编程语言。

在Storm之前,进行实时处理是非常痛苦的事情: 需要维护一堆消息队列和消费者,他们构成了非常复杂的图结构。消费者进程从队列里取消息,处理完成后,去更新数据库,或者给其他队列发新消息。

这样进行实时处理是非常痛苦的。我们主要的时间都花在关注往哪里发消息,从哪里接收消息,消息如何序列化,真正的业务逻辑只占了源代码的一小部分。一个应用程序的逻辑运行在很多worker上,但这些worker需要各自单独部署,还需要部署消息队列。最大问题是系统很脆弱,而且不是容错的:需要自己保证消息队列和worker进程工作正常。

Storm完整地解决了这些问题。它是为分布式场景而生的,抽象了消息传递,会自动地在集群机器上并发地处理流式计算,让你专注于实时处理的业务逻辑。

Storm的特点

Storm有如下特点:

  1. 编程简单:开发人员只需要关注应用逻辑,而且跟Hadoop类似,Storm提供的编程原语也很简单
  2. 高性能,低延迟:可以应用于广告搜索引擎这种要求对广告主的操作进行实时响应的场景。
  3. 分布式:可以轻松应对数据量大,单机搞不定的场景
  4. 可扩展: 随着业务发展,数据量和计算量越来越大,系统可水平扩展
  5. 容错:单个节点挂了不影响应用
  6. 消息不丢失:保证消息处理

不过Storm不是一个完整的解决方案。使用Storm时你需要关注以下几点:

  1. 如果使用的是自己的消息队列,需要加入消息队列做数据的来源和产出的代码
  2. 需要考虑如何做故障处理:如何记录消息队列处理的进度,应对Storm重启,挂掉的场景
  3. 需要考虑如何做消息的回退:如果某些消息处理一直失败怎么办?

Storm的应用

跟Hadoop不一样,Storm是没有包括任何存储概念的计算系统。这就让Storm可以用在多种不同的场景下:非传统场景下数据动态到达或者数据存储在数据库这样的存储系统里(或数据是被实时操控其他设备的控制器(如交易系统)所消费)

很多初学者,对大数据的概念都是模糊不清的,大数据是什么,能做什么,学的时候,该按照什么线路去学习,学完往哪方面发展,想深入了解,想学习的同学欢迎加入大数据学习扣扣君:四一0加上三九一连起来七四四,有大量干货(零基础以及进阶的经典实战)分享给大家,并且有清华大学毕业的资深大数据讲师给大家免费授课

Storm有很多应用:实时分析,在线机器学习(online machine learning),连续计算(continuous computation),分布式远程过程调用(RPC)、ETL等。Storm处理速度很快:每个节点每秒钟可以处理超过百万的数据组。它是可扩展(scalable),容错(fault-tolerant),保证你的数据会被处理,并且很容易搭建和操作。

例如Nathan Marz提供的,产生Twitter的趋势信息。Twitter从海量推文中抽取趋势信息,并在本地区域和国家层级进行维护。这意味者一旦一个案例开始出现,Twitter的话题趋势算法就能实时的鉴别出这个话题。这个实时的算法就是通过在Storm上连续分析Twitter数据来实现的。

其他开源的大数据解决方案

下表列出了一组开源的大数据解决方案,包括传统的批处理和流式处理的应用程序。

解决方案

开发者

类型

描述

Storm

Twitter

流式处理

Twitter的流式处理大数据分析方案

S4

Yahoo!

流式处理

Yahoo!的分布式流式计算平台

Hadoop

Apache

批处理

MapReduce范式的第一个开源实现

Spark

UC Berkeley AMPLab

批处理

支持内存数据集和弹性恢复的分析平台

Yahoo! S4和Storm之间的关键差别是Storm在故障的情况下可以保证消息的处理,而S4可能会丢消息。

Hadoop无疑是大数据分析的王者,本质上是一个批量处理系统,它专注于大数据的批量处理。数据存储在Hadoop 文件系统里(HDFS)并在处理的时候分发到集群中的各个节点。当处理完成,产出的数据放回到HDFS上。在Storm上构建的拓扑处理的是持续不断的流式数据。不同于Hadoop的任务,这些处理过程不会终止,会持续处理到达的数据。

Hadoop处理的是静态的数据,而Storm处理的是动态的、连续的数据。Twitter的用户每天都会发上千万的推,所以这种处理技术是非常有用的。Storm不仅仅是一个传统的大数据分析系统:它是一个复杂事件处理系统的例子。复杂事件处理系统通常是面向检测和计算的,这两部分都可以通过用户定义的算法在Storm中实现。例如,复杂事件处理可以用来从大量的事件中区分出有意义的事件,然后对这些事件实时处理。

Storm模型

Storm实现了一个数据流(data flow)的模型,在这个模型中数据持续不断地流经一个由很多转换实体构成的网络。一个数据流的抽象叫做流(stream),流是无限的元组(Tuple)序列。元组就像一个可以表示标准数据类型(例如int,float和byte数组)和用户自定义类型(需要额外序列化代码的)的数据结构。每个流由一个唯一的ID来标示的,这个ID可以用来构建拓扑中各个组件的数据源。

如下图所示,其中的水龙头代表了数据流的来源,一旦水龙头打开,数据就会源源不断地流经Bolt而被处理。图中有三个流,用不同的颜色来表示,每个数据流中流动的是元组(Tuple),它承载了具体的数据。元组通过流经不同的转换实体而被处理。

Storm对数据输入的来源和输出数据的去向没有做任何限制。像Hadoop,是需要把数据放到自己的文件系统HDFS里的。在Storm里,可以使用任意来源的数据输入和任意的数据输出,只要你实现对应的代码来获取/写入这些数据就可以。典型场景下,输入/输出数据来是基于类似Kafka或者ActiveMQ这样的消息队列,但是数据库,文件系统或者web服务也都是可以的。

topology

概念

Storm中涉及的主要概念有:

1. 拓扑(Topologies) 2.元组(Tuple) 3.流(Streams) 4.Spouts(喷嘴) 5.Bolts 6.任务(Tasks) 7.组件(Component) 8.流分组(Stream groupings) 9.可靠性(Reliability) 10.Workers(工作进程)

可以看到Storm中各个概念的名字起的非常好,也很形象。

拓扑(Topologies) 一个Storm拓扑打包了一个实时处理程序的逻辑。一个Storm拓扑跟一个MapReduce的任务(job)是类似的。主要区别是MapReduce任务最终会结束,而拓扑会一直运行(当然直到你杀死它)。一个拓扑是一个通过流分组(stream grouping)把Spout和Bolt连接到一起的拓扑结构。图的每条边代表一个Bolt订阅了其他Spout或者Bolt的输出流。一个拓扑就是一个复杂的多阶段的流计算。

资源

TopologyBuilder: 使用这个类来在Java中创建拓扑 在生产集群中运行拓扑 本地模式: 通过阅读这篇可以学习到如何在本地模式下进行拓扑的开发和测试 元组(Tuple) 元组是Storm提供的一个轻量级的数据格式,可以用来包装你需要实际处理的数据。元组是一次消息传递的基本单元。一个元组是一个命名的值列表,其中的每个值都可以是任意类型的。元组是动态地进行类型转化的--字段的类型不需要事先声明。在Storm中编程时,就是在操作和转换由元组组成的流。通常,元组包含整数,字节,字符串,浮点数,布尔值和字节数组等类型。要想在元组中使用自定义类型,就需要实现自己的序列化方式。

资源

流是Storm中的核心抽象。一个流由无限的元组序列组成,这些元组会被分布式并行地创建和处理。通过流中元组包含的字段名称来定义这个流。 每个流声明时都被赋予了一个ID。只有一个流的Spout和Bolt非常常见,所以提供了不需要指定ID来声明一个流的函数(Spout和Bolt都需要声明输出的流)。这种情况下,流的ID是默认的“default”。

资源

  • 用来声明流和流的定义
  • Storm元组的动态类型转化,声明自定义的序列化方式
  • 自定义的序列化必须实现这个接口
  • 可以通过这个配置来注册自定义的序列化接口

Spout(喷嘴,这个名字很形象)是Storm中流的来源。通常Spout从外部数据源,如消息队列中读取元组数据并吐到拓扑里。Spout可以是可靠的(reliable)或者不可靠(unreliable)的。可靠的Spout能够在一个元组被Storm处理失败时重新进行处理,而非可靠的Spout只是吐数据到拓扑里,不关心处理成功还是失败了。

Spout可以一次给多个流吐数据。此时需要通过的declareStream函数来声明多个流并在调用提供的emit方法时指定元组吐给哪个流。

Spout中最主要的函数是nextTuple,Storm框架会不断调用它去做元组的轮询。如果没有新的元组过来,就直接返回,否则把新元组吐到拓扑里。nextTuple必须是非阻塞的,因为Storm在同一个线程里执行Spout的函数。

Spout中另外两个主要的函数是ackfail。当Storm检测到一个从Spout吐出的元组在拓扑中成功处理完时调用ack,没有成功处理完时调用fail。只有可靠型的Spout会调用ackfail函数。

Bolts 在拓扑中所有的计算逻辑都是在Bolt中实现的。一个Bolt可以处理任意数量的输入流,产生任意数量新的输出流。Bolt可以做函数处理,过滤,流的合并,聚合,存储到数据库等操作。Bolt就是流水线上的一个处理单元,把数据的计算处理过程合理的拆分到多个Bolt、合理设置Bolt的task数量,能够提高Bolt的处理能力,提升流水线的并发度。

Bolt可以给多个流吐出元组数据。此时需要使用的declareStream方法来声明多个流并在使用[OutputColletor](https://storm.apache.org/javadoc/apidocs/backtype/storm/task/OutputCollector.html)emit方法时指定给哪个流吐数据。

当你声明了一个Bolt的输入流,也就订阅了另外一个组件的某个特定的输出流。如果希望订阅另一个组件的所有流,需要单独挨个订阅。InputDeclarer有语法糖来订阅ID为默认值的流。例如declarer.shuffleGrouping("redBolt")订阅了redBolt组件上的默认流,跟declarer.shuffleGrouping("redBolt", DEFAULT_STREAM_ID)是相同的。

在Bolt中最主要的函数是execute函数,它使用一个新的元组当作输入。Bolt使用OutputCollector对象来吐出新的元组。Bolts必须为处理的每个元组调用OutputCollectorack方法以便于Storm知道元组什么时候被各个Bolt处理完了(最终就可以确认Spout吐出的某个元组处理完了)。通常处理一个输入的元组时,会基于这个元组吐出零个或者多个元组,然后确认(ack)输入的元组处理完了,Storm提供了接口来自动完成确认。

必须注意OutputCollector不是线程安全的,所以所有的吐数据(emit)、确认(ack)、通知失败(fail)必须发生在同一个线程里。更多信息可以参照问题。

资源

  • 这是Bolt的通用接口
  • 很方便的Bolt接口,用于定义做过滤或者简单处理的Bolt
  • Bolt通过这个类的实例来吐元组给输出流

任物

每个Spout和Bolt会以多个任务(Task)的形式在集群上运行。每个任务对应一个执行线程,流分组定义了如何从一组任务(同一个Bolt)发送元组到另外一组任务(另外一个Bolt)上。可以在调用TopologyBuildersetSpoutsetBolt函数时设置每个Spout和Bolt的并发数。

组件(component)是对Bolt和Spout的统称

定义拓扑的时候,一部分工作是指定每个Bolt应该消费哪些流。流分组定义了一个流在一个消费它的Bolt内的多个任务(task)之间如何分组。流分组跟计算机网络中的路由功能是类似的,决定了每个元组在拓扑中的处理路线。

在Storm中有七个内置的流分组策略,你也可以通过实现接口来自定义一个流分组策略:

  1. 洗牌分组(Shuffle grouping): 随机分配元组到Bolt的某个任务上,这样保证同一个Bolt的每个任务都能够得到相同数量的元组。
  2. 字段分组(Fields grouping): 按照指定的分组字段来进行流的分组。例如,流是用字段“user-id"来分组的,那有着相同“user-id"的元组就会分到同一个任务里,但是有不同“user-id"的元组就会分到不同的任务里。这是一种非常重要的分组方式,通过这种流分组方式,我们就可以做到让Storm产出的消息在这个"user-id"级别是严格有序的,这对一些对时序敏感的应用(例如,计费系统)是非常重要的。
  3. Partial Key grouping: 跟字段分组一样,流也是用指定的分组字段进行分组的,但是在多个下游Bolt之间是有负载均衡的,这样当输入数据有倾斜时可以更好的利用资源。很好的解释了这是如何工作的,有哪些优势。
  4. All grouping: 流会复制给Bolt的所有任务。小心使用这种分组方式。在拓扑中,如果希望某类元祖发送到所有的下游消费者,就可以使用这种All grouping的流分组策略。
  5. Global grouping: 整个流会分配给Bolt的一个任务。具体一点,会分配给有最小ID的任务。
  6. 不分组(None grouping): 说明不关心流是如何分组的。目前,None grouping等价于洗牌分组。
  7. Direct grouping:一种特殊的分组。对于这样分组的流,元组的生产者决定消费者的哪个任务会接收处理这个元组。只能在声明做直连的流(direct streams)上声明Direct groupings分组方式。只能通过使用emitDirect系列函数来吐元组给直连流。一个Bolt可以通过提供的TopologyContext来获得消费者的任务ID,也可以通过OutputCollector对象的emit函数(会返回元组被发送到的任务的ID)来跟踪消费者的任务ID。在ack的实现中,Spout有两个直连输入流,ack和ackFail,使用了这种直连分组的方式。
  8. Local or shuffle grouping:如果目标Bolt在同一个worker进程里有一个或多个任务,元组就会通过洗牌的方式分配到这些同一个进程内的任务里。否则,就跟普通的洗牌分组一样。这种方式的好处是可以提高拓扑的处理效率,因为worker内部通信就是进程内部通信了,相比拓扑间的进程间通信要高效的多。worker进程间通信是通过使用Netty来进行网络通信的。

资源

  • 使用这个类来定义拓扑
  • 当调用TopologyBuildersetBolt函数时会返回这个对象,它用来声明一个Bolt的输入流并指定流的分组方式。
  • 这个Bolt对于分布式的RPC拓扑很有用,大量使用了直连流(direct streams)和直连分组(direct groupings)

Storm保证了拓扑中Spout产生的每个元组都会被处理。Storm是通过跟踪每个Spout所产生的所有元组构成的树形结构并得知这棵树何时被完整地处理来达到可靠性。每个拓扑对这些树形结构都有一个关联的“消息超时”。如果在这个超时时间里Storm检测到Spout产生的一个元组没有被成功处理完,那Sput的这个元组就处理失败了,后续会重新处理一遍。

为了发挥Storm的可靠性,需要你在创建一个元组树中的一条边时告诉Storm,也需要在处理完每个元组之后告诉Storm。这些都是通过Bolt吐元组数据用的OutputCollector对象来完成的。标记是在emit函数里完成,完成一个元组后需要使用ack函数来告诉Storm。

这些都在一文中会有更详细的介绍。

拓扑以一个或多个Worker进程的方式运行。每个Worker进程是一个物理的Java虚拟机,执行拓扑的一部分任务。例如,如果拓扑的并发设置成了300,分配了50个Worker,那么每个Worker执行6个任务(作为Worker内部的线程)。Storm会尽量把所有的任务均分到所有的Worker上。

资源

  • Config.TOPOLOGY_WORKERS: 这个配置设置了执行拓扑时分配Worker的数量。

Storm中用到的技术

提供了可扩展环境下的传输层高效消息通信,一开始Storm的内部通信使用的是ZeroMQ,后来作者想把Storm移交给Apache开源基金会来管理,而ZeroMQ的许可证书跟Apache基金会的政策有冲突。在Storm中,Netty比ZeroMQ而且提供了worker间通信时的验证机制,所以在就改用了。

 Storm系统的实现语言。Clojure是由Rich Hicky作为一种通用语言发明的,它衍生自Lisp语言,简化了多线程编程。

Zookeeper是一个实现高可靠的分布式协作的开源项目。Storm使用Zookeeper来协调集群中的多个节点。