配置环境变量不识别sparkk的windows怎么处理无限流

实现批处理的技术许许多多,从各种关系型数据库的sql处理,到大数据领域的MapReduce,Hive,Spark等等。这些都是处理有限数据流的经典方式。而Flink专注的是无限流处理,那么他是怎么做到批处理的呢?

无限流处理:输入数据没有尽头;数据处理从当前或者过去的某一个时间 点开始,持续不停地进行

另一种处理形式叫作有限流处理,即从某一个时间点开始处理数据,然后在另一个时间点结束。输入数据可能本身是有限的(即输入数据集并不会随着时间增长),也可能出于分析的目的被人为地设定为有限集(即只分析某一个时间段内的事件)。

显然,有限流处理是无限流处理的一种特殊情况,它只不过在某个时间点停止而已。此外,如果计算结果不在执行过程中连续生成,而仅在末尾处生成一次,那就是批处理(分批处理数据)。

批处理是流处理的一种非常特殊的情况。在流处理中,我们为数据定义滑 动窗口或滚动窗口,并且在每次窗口滑动或滚动时生成结果。批处理则不同,我们定义一个全局窗口,所有的记录都属于同一个窗口。举例来说, 以下代码表示一个简单的Flink 程序,它负责每小时对某网站的访问者计数,并按照地区分组。

如果知道输入数据是有限的,则可以通过以下代码实现批处理。

Flink 的不寻常之处在于,它既可以将数据当作无限流来处理,也可以将它当作有限流来处理。Flink 的 DataSet API 就是专为批处理而生的,如下所示。

如果输入数据是有限的,那么以上代码的运行结果将与前一段代码的相同, 但是它对于习惯使用批处理器的程序员来说更友好。

Flink 通过一个底层引擎同时支持流处理和批处理

在流处理引擎之上,Flink 有以下机制:

  • 检查点机制和状态机制:用于实现容错、有状态的处理;
  • 水印机制:用于实现事件时钟;
  • 窗口和触发器:用于限制计算范围,并定义呈现结果的时间。

在同一个流处理引擎之上,Flink 还存在另一套机制,用于实现高效的批处理。

  • 用于调度和恢复的回溯法:由 Microsoft Dryad 引入,现在几乎用于所有批处理器;
  • 用于散列和排序的特殊内存数据结构:可以在需要时,将一部分数据从内存溢出到硬盘上;
  • 优化器:尽可能地缩短生成结果的时间。

两套机制分别对应各自的API(DataStream API 和 DataSet API);在创建 Flink 作业时,并不能通过将两者混合在一起来同时 利用 Flink 的所有功能。

在最新的版本中,Flink 支持两种关系型的 API,。这两个 API 都是批处理和流处理统一的 API,这意味着在无边界的实时数据流和有边界的历史记录数据流上,关系型 API 会以相同的语义执行查询,并产生相同的结果。Table API 和 SQL 借助了 来进行查询的解析,校验以及优化。它们可以与 DataStream 和 DataSet API 无缝集成,并支持用户自定义的标量函数,聚合函数以及表值函数。

Table API / SQL 正在以流批统一的方式成为分析型用例的主要 API。

MapReduce、Tez、Spark 和 Flink 在执行纯批处理任务时的性能比较。测试的批处理任务是 TeraSort 和分布式散列连接。

第一个任务是 TeraSort,即测量为 1TB 数据排序所用的时间。

TeraSort 本质上是分布式排序问题,它由以下几个阶 段组成:

(1) 读取阶段:从 HDFS 文件中读取数据分区;

(2) 本地排序阶段:对上述分区进行部分排序;

(3) 混洗阶段:将数据按照 key 重新分布到处理节点上;

(4) 终排序阶段:生成排序输出;

(5) 写入阶段:将排序后的分区写入 HDFS 文件。

第二个任务是一个大数据集(240GB)和一个小数据集(256MB)之间的分布式散列连接。结果显示,Flink 仍然是速度最快的系统,它所用的时间分别是 Tez 和 Spark 的 1/2 和 1/4.

产生以上结果的总体原因是,Flink 的执行过程是基于流的,这意味着各个处理阶段有更多的重叠,并且混洗操作是流水线式的,因此磁盘访问操作更少。相反,MapReduce、Tez 和 Spark 是基于批的,这意味着数据在通过网络传输之前必须先被写入磁盘。该测试说明,在使用Flink 时,系统空闲时间和磁盘访问操作更少。

值得一提的是,性能测试结果中的原始数值可能会因集群设置、配置和软件版本而异。

因此,Flink 可以用同一个数据处理框架来处理无限数据流和有限数据流,并且不会牺牲性能。

更多Flink相关文章:

非常荣幸有机会和大家分享一下 Apache Pulsar 怎样为批流处理提供融合的存储。希望今天的分享对做大数据处理的同学能有帮助和启发。

这次分享,主要分为四个部分:

  • 介绍与其他消息系统相比, Apache Pulsar 的独特优势
  • 分析批流处理中的存储需求
  • 讲述 Apache Pulsar 如何完美匹配批流处理中的存储需求

Apache Pulsar 是新近开源的一个大规模分布式消息系统,是 Apache 的顶级项目,在 Yahoo 全球数十个机房大规模部署并线上稳定使用了 4 年多。Apache Pulsar 设计中学习和借鉴了其他优秀的分布式系统,在保证一致性和高吞吐的同时,也提供了其他优秀特性,比如支持上百万的 Topic、无缝的多中心互备、灵活的扩展性等。

这里我们简单介绍一下,与其他消息系统相比, Apache Pulsar 拥有的独特优势,大致有以下3点:

  • 独特的软件架构(存储和计算分离,分层分片的存储)
  • 丰富的企业特性(多租户)

从架构上来说,Apache Pulsar 采用了分层和分片的架构。这是 Pulsar 满足批流处理中存储需求的基础。

在 Apache Pulsar 的分层架构中,服务层 Broker 和存储层 BookKeeper 的每个节点都是对等的。Broker 仅仅负责消息的服务支持,不存储数据。这为服务层和存储层提供了瞬时的节点扩展和无缝的失效恢复。

WAL 和数据处理中的流有很多相似性,都是数据源源不断地追加,都对顺序和一致性有严格要求。

BookKeeper 通过 Quorum Vote 的方式来实现数据的一致性,跟 Master/Slave 模式不同,BookKeeper 中每个节点也是对等的,对一份数据会并发地同时写入指定数目的存储节点。对等的存储节点,保证了多个备份可以被并发访问;也保证了存储中即使只有一份数据可用,也可以对外提供服务。

Apache Pulsar 通过分层分片的架构,将逻辑的分区转化为分片来作为存储单元。这为数据的并发访问提供了基础。

除了架构的不同,从用户接口来说,Apache Pulsar 通过订阅的抽象,提供了灵活的消费模型。每一个订阅类似一个 Consumer Group,接收一个 topic 的所有的消息。用户可以使用不同的订阅类型、以不同的模式来共同消费同一个 Topic 中的消息。

如果对顺序性有要求,可以使用 Exclusive 和 Failover 的订阅模式,这样同一个 Topic 只有一个 Consumer 在消费,可以保证顺序性。

如果使用 Shared 订阅模式,多个 Consumer 可以并发消费同一个 Topic。通过动态增加 Consumer 的数量,可以加速 Topic 的消费,减少消息在服务端的堆积。

Pulsar 即将发布的 2.4.0 版本添加了一种新的订阅模式: KeyShared。KeyShared 模式保证在 Shared 模式下同一个 Key 的消息也会发送到同一个 Consumer,在并发的同时也保证了顺序性。

Apache Pulsar 灵活的消费模型,避免了因为不同的消费场景需要部署多套消息系统的场景,消除了数据生产端的数据分离。

此外,Apache Pulsar 是以多租户为基础的丰富的企业级特性。企业内部可以搭建一套 Pulsar 集群,在集群中给各个部门分配不同的租户,并设置租户的管理权限。租户的管理员再根据部门的不同业务和场景需求,创建不同的 Namespace。在 Namespace 中可以设置管理策略,比如流控,Quota,互备的集群,数据副本数等。这样为 Topic 的管理提供了一个层级的可控的视图。

Apache Pulsar 的企业级特性,为企业搭建统一大集群提供了基础,方便了集群的管理和数据的共享。

以上是关于 Apache Pulsar 的简单介绍,欢迎参阅 Apache Pulsar 的官网和微信公众号了解更多内容。

在大数据处理刚刚兴起的时候,一般用户会采用 λ 架构,维护批流两套系统:批系统主要处理历史数据; 流系统处理实时的数据,对批系统的结果进行补充来提高时效。两套系统造成数据冗余,增加维护成本。

在存储层,批处理常使用 HDFS 和网络对象存储等;流处理常使用 Kafka 或其他的消息系统。

为了解决 λ 架构的问题,逐渐演化出 κ 架构,使用一套系统来满足实时数据处理和历史数据处理的需求。

在 κ 架构中,数据的“可重复处理”是关键。一方面要求实时数据能及时获取最新数据,处理完立即导出给其他系统使用;另一方面要满足处理历史数据的需求,需要具备读大量历史数据的能力。实时数据的处理决定了必须使用消息系统,但是消息系统并不能完全满足批处理的并发需求。

在前面的分享中,百度和阿里的专家分享了计算层的批流融合。我们认为批流融合存储层的需求是一个融合的存储表征: 消息系统 + 并发的存储访问。

为什么 Apache Pulsar 能满足批流处理中的存储需求

下面我们从 “Apache Pulsar 提供的存储抽象”、“批流处理中的 IO 模式”和 “Apache Pulsar 提供的无限流存储” 这三个方面来解释为什么 Apache Pulsar 能满足批流融合的存储需求。

Pulsar 灵活的订阅模式和高带宽、低延迟特性,能够很好的满足流处理的需求。

Apache Pulsar 的 Topic 可以分为不同的分区。和其他消息系统不同的是 Apache Pulsar 利用分片的架构,每个逻辑分区又进行了分片。

在分层分片的架构中,分片是存储的单元,可以类比 HDFS 中的一个文件块,分片被均匀地分布在存储层的 BookKeeper 节点中。

我们再从批流处理的角度来看 Apache Pulsar 的这种分片(Segment)的架构:

  • 对于流处理来说,Apache Pulsar 的每个 Partition 就是流处理的一个流,它通过 Pub/Sub 的接口来给流处理提供数据交互。
  • 对于批处理来说,Apache Pulsar 以分片为粒度,可以为批处理提供数据的并发访问。

另一方面, Apache Pulsar 的 Partition 是逻辑分区的概念,分区内部又被分成分片,作为存储和 IO 访问的单元。

匹配批流处理中的 IO 模式

Apache Pulsar 的写先发送到 broker,然后 broker 作为存储代理,并发将数据发送给存储层的多个 Bookie 节点。两种架构都会有两次网络跳跃。

对于 Write 模式,延迟差别不大。

Tailing Read 是流处理中的常用模式。它从 Stream 的尾部读取最新写入的数据。

两种架构都只有 1 次网络跳跃。对 Tailing Read 模式,延迟差别不大。

Catchup Read 是批处理中常用的读取模式。它从 Stream 的指定位置,读取一定量的历史数据。这种场景一般对数据的读取量比较大,注重读取的带宽。

接口,直接从存储层并发访问多个分片。BookKeeper 提供了多副本的高可用,提升了读取历史数据的并发能力。

如果我们把这三种 IO 模式放在一起看就更有意思了。 这可以类比用户在某时间段,对 Stream 既有最新数据读写,也有历史数据读写的情形。这是在批流融合中经常遇到的场景。

对和 Kafka 类似的系统,这三种 IO 模式都会发生在 Leader Broker。在 Leader Broker 中,系统的数据都需要通过文件系统的 Pagecache,历史数据和最新的数据会争用 Pagecache 资源,造成读写响应不及时。

如果这时再遇到 Broker 磁盘空间写满,需要扩容的情况,那就需要等待数据的搬移和 rebalance 的操作。这时,IO 的延迟和服务质量很难得到保障。

对历史数据的并发读写,直接发生在存储节点。冷热数据被天然隔离,用户完全不用担心 IO 的冲突和争用。Apache Pulsar 在节点扩容和错误恢复的过程中,也不会有数据大量拷贝和 rebalance,因此提升了系统的高可用性。

通过这三种 IO 模式的说明和对比,我们发现 Pulsar Segmented Stream 的存储表征,再结合分层分片的架构,可以很好地满足批流处理中对存储系统的需求。

Pulsar Segmented Stream 的存储表征,很好地模拟了现实中 Stream 数据。对于流存储的另一个需求是理论上无限的存储空间。这样可以满足对历史数据的存储和访问需求。Apache Pulsar 从两个方面解决了这个问题。

一方面 Pulsar 的存储层中,分片会均衡地分布到所有的存储节点中,这避免了其他系统中单一broker 存储容量的限制,进而可以利用整个集群的存储空间。

另一方面,Pulsar 的分片架构,为数据的二级存储扩展提供了很好的基础。对于Segmented Stream,用户可以设置 Segment 在 BookKeeper 中保留的时间或大小。如果超过设定的值,将旧的 Segment 迁移到廉价的二级存储,比如 Aws S3,Google Cloud Storage,或者HDFS 中。二级存储的带宽一般有保障,可以满足历史数据的批处理模式。 通过二级存储可以减轻无限存储的成本。

Pulsar 利用自身的分层分片的架构,提供了 Segmented Stream 的存储表征,满足了批流融合的存储需求。

从批流处理的 IO 模式分析中可以发现,Pulsar 的架构可以很好地处理批流处理中的 IO 并发和隔离。并且 Pulsar 提供了理论上无限流存储的能力,能够满足批处理中,对海量历史数据的存储需求。

怎样使用 Pulsar 提供批流融合的存储

前面我们介绍了为什么 Pulsar 的架构能满足批流融合的存储需求。接着我们会介绍 Pulsar是如何在工程上实现的。

基于 Segmented Stream 存储的表征,我们很容易区分和支持批处理和流处理。批处理所请求的数据可以看做是一个有边界的流(Bounded Stream)。流处理所请求的数据可以看做是一个没有边界的流(UnBounded Stream)。

这里的代码是一个计算广告点击率的 SQL 语句。如果用户想要查询某个时间段内的点击率,会提供点击事件的起止时间。起止时间可以确定一个流的起止边界,进而确定一个 Bounded Stream。这是一个典型的批处理场景。

流处理是一系列不会停止的 Windows 访问和查询。与批处理相比,流处理它没有截止的时间点,即使查询到当前时刻,它仍然继续对当前的 window 不断地查询,一个 window 处理结束,接着处理下一个 window。它的 SQL 查询语句不会变化,但是查询 window 中的数据会不断实时更新,它是一个源源不断的、不停处理最新数据的方式。

对于这种访问模式,直接使用 Pulsar 的 pub/sub 接口就可以直接获取最新的消息,满足流处理的需求。

对批流融合,在计算层,更多关注的是批流融合的计算模型、API 和运行时的统一。在存储层,通过 Segmented Stream 的存储表征,为批流数据提供了统一的数据存储和组织方式。

对于批处理的接口,我们在 Pulsar SQL 里面做了一个尝试,Pulsar SQL 借助 Presto,对写入Pulsar 中的数据进行交互式的查询。

Pub/Sub 的接口已经比较完善,我们最近在丰富和完善 PSegment 接口。

在 PSegment 中,我们的主要工作是集成Pulsar 和 Flink、Spark、Hive 及 Presto 。这些工作主要集中在 API 的实现和 Schema 的整合。这些工作完成之后,我们会开源这部分的代码。

Pulsar 是下一代云原生的消息和流存储的平台。我们认为消息和流是一份数据的两种不同表征方式。Pulsar 采用了存储计算分离的分层架构和分区内再分片的存储架构,这种架构能够提供基于Segmented Stream 的存储表征,能为批和流处理提供融合的存储基础。

作者翟佳,StreamNative 联合创始人兼 CTO,本文为其 InfoQ 技术大会演讲的内容整理。

Flink安装在CentOS7上,默认时间是UTC时间,查看Flink日志,发现输出时间比当前时间晚8个小时. 通过如下命令,调整成北京时间 cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime 但是查看Flink输出的日志时间格式,输出时间仍然比当前时间晚8个小时! 经过研究,以下操作,可以解决日志输出时间比当前时间晚8小时的问题.

我要回帖

更多关于 配置环境变量不识别spark 的文章

 

随机推荐