申请专栏作者
投稿发布
您的当前位置:主页 > yabo亚博体育下载 > 正文

Apache Beam 实战指南 | 大yabo88滚球管道 (pipeline) 设计及

来源: 时间:2019-08-22
请点击下面的广告后浏览!

作者:张海涛

关于 Apache Beam 实战指南系列文章 可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

随着大yabo88滚球 2.0 时代悄然到来,大yabo88滚球从简单的批处理扩展到了实时处理、流处理、交互式查询和机器学习应用。近年来涌现出诸多大yabo88滚球应用组件,如 HBase、Hive、Kafka、Spark、Flink 等。开发者经常要用到不同的技术、框架、API、开发语言和 SDK 来应对复杂应用的开发,这大大增加了选择合适工具和框架的难度,开发者想要将所有的大yabo88滚球组件熟练运用几乎是一项不可能完成的任务。 可思yabo88滚球sykv.com,sykv.cn

面对这种情况,Google 在 2016 年 2 月宣布将大yabo88滚球流水线产品(Google DataFlow)贡献给 Apache 基金会孵化,2017 年 1 月 Apache 对外宣布开源 Apache Beam,2017 年 5 月迎来了它的第一个稳定版本 2.0.0。在国内,大部分开发者对于 Beam 还缺乏了解,社区中文资料也比较少。 本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

可思yabo88滚球sykv.com

一.概述 内容来自可思yabo88滚球sykv.com

其他行业问咱们 IT 具体干什么的,很多 IT 人员会自嘲自己就是“搬砖”(此处将复制代码称为搬砖)的民工。过了两天 GitHub 出现自动写代码的人工智能,IT 程序员深深叹了一口气说道“完了要失业了,代码没得搬了”。其实从入行 IT 那一刻起,不管我们做前端、服务端、底层架构等任何岗位,其实我们都是为yabo88滚球服务的服务人员(注:不是说从民工转岗到服务员了):把yabo88滚球从后端搬到前端,把前端yabo88滚球再写入yabo88滚球库。尽管编程语言从 C、C++、C#、JAVA、Python 不停变化,为了适应时代背景框架也是千变万化,我们拼命从“亚马逊热带雨林”一直学到“地中海”。

本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

然后 Apache Beam 这个一统“地中海”的框架出现了。Apache Beam 不光统一了yabo88滚球源,还统一了流批计算。在这个yabo88滚球传输过程中有一条核心的技术就是管道(Pipeline),不管是 Strom,Flink ,Beam 它都是核心。在这条管道中可以对yabo88滚球进行过滤、净化、清洗、合并、分流以及各种实时计算操作。

可思yabo88滚球-人工智能资讯平台sykv.com

本文会详细介绍如何设计 Apache Beam 管道、管道设计工具介绍、源码和案例分析,普及和提升大家对 Apache Beam 管道的认知。

内容来自可思yabo88滚球sykv.com

二.怎样设计好自己的管道?

可思yabo88滚球-人工智能资讯平台sykv.com

设计管道注意事项

可思yabo88滚球sykv.com,sykv.cn

? 可思yabo88滚球sykv.com

? 可思yabo88滚球sykv.com

图 2-1 简单管道

可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

1. 你输入的yabo88滚球存储在那里? 可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

首先要确定你要构造几条yabo88滚球源,在 Beam 可以构建多条,构建之前可以选择自己的 SDK 的 IO。

可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

2. 你的yabo88滚球类型是什么样的?

内容来自可思yabo88滚球sykv.com

Beam 提供的是键值对的yabo88滚球类型,你的yabo88滚球可能是日志文本、格式化设备事件、yabo88滚球库的行,所以在 PCollection 就应该确定yabo88滚球集的类型。 可思yabo88滚球sykv.com

3. 你想怎么处理yabo88滚球? 可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

对yabo88滚球进行转换、过滤处理、窗口计算、SQL 处理等。 在管道中提供了通用的 ParDo 转换类,算子计算以及 BeamSQL 等操作。 本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

4. 你打算把yabo88滚球最后输出到哪里去? 可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

在管道末尾进行 Write 写入操作,把yabo88滚球最后写入你自己想存放或最后流向的地方。

本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

管道的几种玩法 可思yabo88滚球-AI,sykv.com智能驾驶,人脸识别,区块链,大yabo88滚球

1. 分支管道:多次转换,处理相同的yabo88滚球集

可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

?

可思yabo88滚球-人工智能资讯平台sykv.com

?

可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

图 2-2-1 多次转换处理相同yabo88滚球示意图

可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

描述:例如上图 2-1-1 图所示,从一个yabo88滚球库的表读取或转换yabo88滚球集,然后从yabo88滚球集中分别找找以字母“A”开头的yabo88滚球放入一个分支yabo88滚球集中,如果以字母“B”开头的yabo88滚球放入另一个分支yabo88滚球集中,最终两个yabo88滚球集进行隔离处理。

可思yabo88滚球sykv.com,sykv.cn

yabo88滚球集: 可思yabo88滚球-www.sykv.cn,sykv.com

// 为了演示显示内存yabo88滚球集
final List LINES = Arrays.asList(
"Aggressive",
"Bold",
"Apprehensive",
"Brilliant");

本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

示例代码:

可思yabo88滚球-AI,sykv.com智能驾驶,人脸识别,区块链,大yabo88滚球

PCollection dbRowCollection = ...;// 这个地方可以读取任何yabo88滚球源。
PCollection aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("A")){// 查找以 "A" 开头的yabo88滚球
c.output(c.element());
System.out.append("A 开头的单词有:"+c.element()+"\r");
}
}
}));
PCollection bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("B")){// 查找以 "A" 开头的yabo88滚球
c.output(c.element());
System.out.append("B 开头的单词有:"+c.element()+"\r");
}
}
}));
可思yabo88滚球-人工智能资讯平台sykv.com

最终结果展示: 可思yabo88滚球sykv.com,sykv.cn

A 开头的单词有:Aggressive
B 开头的单词有:Bold
A 开头的单词有:Apprehensive
B 开头的单词有:Brilliant 内容来自可思yabo88滚球sykv.com

原示例代码地址 : pipelineTest2_1

可思yabo88滚球sykv.com

2. 分支管道:一次转换,输出多个yabo88滚球集 可思yabo88滚球sykv.com

? 可思yabo88滚球-www.sykv.cn,sykv.com

? 本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

图 2-2-2 一次转换多个输出示意图 可思yabo88滚球sykv.com

描述:根据图 2-2-1 和图 2-2-2 图中可以看出,他们以不同的方式执行着相同的操作,图 2-2-1 中的管道包含两个转换,用于处理同一输入中的元素 PCollection。一个转换使用以下逻辑:

内容来自可思yabo88滚球sykv.com

if(以'A'开头){outputToPCollectionA}

可思yabo88滚球-www.sykv.cn,sykv.com

另一个转换为 可思yabo88滚球-www.sykv.cn,sykv.com

if(以'B'开头){outputToPCollectionB} 本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

因为每个转换读取整个输入 PCollection,所以输入中的每个元素都会 PCollection 被处理两次。

可思yabo88滚球sykv.com,sykv.cn

图 2-2-2 中的管道以不同的方式执行相同的操作 - 只有一个转换使用以下逻辑: 可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

if(以'A'开头){outputToPCollectionA} else if(以'B'开头){outputToPCollectionB} 可思yabo88滚球-人工智能资讯平台sykv.com

其中输入中的每个元素都 PCollection 被处理一次。

可思yabo88滚球-人工智能资讯平台sykv.com

yabo88滚球集:同 2-1-1 yabo88滚球集 可思yabo88滚球sykv.com,sykv.cn

示例代码: 可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

// 定义两个 TupleTag,每个输出一个。
final TupleTag startsWithATag = new TupleTag(){};
final TupleTag startsWithBTag = new TupleTag(){};
PCollectionTuple mixedCollection =
dbRowCollection.apply(ParDo
.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().startsWith("A")) {
// 返回首字母带有 "A" 的yabo88滚球集。
c.output(c.element());
} else if(c.element().startsWith("B")) {
// // 返回首字母带有 "B" 的yabo88滚球集。
c.output(startsWithBTag, c.element());
}
}
})
// Specify main output. In this example, it is the output
// with tag startsWithATag.
.withOutputTags(startsWithATag,
// Specify the output with tag startsWithBTag, as a TupleTagList.
TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag. 可思yabo88滚球sykv.com
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);
本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

如果每个元素的转换计算非常耗时,则使用其他输出会更有意义,因为一次性过滤全部yabo88滚球,比全部yabo88滚球过滤两次从性能上和转换上都存在一定程度上提升,yabo88滚球量越大越明显。

可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

最终结果展示:

本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

复制代码A 开头的单词有:ApprehensiveA 开头的单词有:AggressiveB 开头的单词有:BrilliantB 开头的单词有:Bold

可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

原示例代码地址 : pipelineTest2_2 可思yabo88滚球-AI,sykv.com智能驾驶,人脸识别,区块链,大yabo88滚球

3. 合并管道:多个yabo88滚球集,合并成一个管道输出

可思yabo88滚球-AI,sykv.com智能驾驶,人脸识别,区块链,大yabo88滚球

? 内容来自可思yabo88滚球sykv.com

? 内容来自可思yabo88滚球sykv.com

图 2-2-3 多yabo88滚球集合并输出图 本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

描述:

可思yabo88滚球-人工智能资讯平台sykv.com

上图 2-2-3 是接图 2-2-1 的继续,把带“A” 的yabo88滚球和带“B” 字母开头的yabo88滚球进行合并到一个管道。

可思yabo88滚球-AI,sykv.com智能驾驶,人脸识别,区块链,大yabo88滚球

这个地方注意点是 Flatten 用法必须两个yabo88滚球的yabo88滚球类型相同。 内容来自可思yabo88滚球sykv.com

yabo88滚球集:

可思yabo88滚球-www.sykv.cn,sykv.com

// 为了演示显示内存yabo88滚球集
final List LINESa = Arrays.asList(
"Aggressive",
"Apprehensive");
final List LINESb = Arrays.asList(
"Bold",
"Brilliant");
可思yabo88滚球-www.sykv.cn,sykv.com

示例代码: 内容来自可思yabo88滚球sykv.com

// 将两个 PCollections 与 Flatten 合并
PCollectionList collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection mergedCollectionWithFlatten = collectionList
.apply(Flatten.pCollections());
// 继续合并新的 PCollection
mergedCollectionWithFlatten.apply(...);
可思yabo88滚球-人工智能资讯平台sykv.com

结果展示: 可思yabo88滚球-人工智能资讯平台sykv.com

合并单词单词有:
Aggressive
Brilliant
Apprehensive
Bold

内容来自可思yabo88滚球sykv.com

原示例代码地址 : pipelineTest2_3 本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

4. 合并管道:多个yabo88滚球源,链接合并一个管道输出 可思yabo88滚球-人工智能资讯平台sykv.com

?

可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

? 可思yabo88滚球sykv.com,sykv.cn

图 2-2-4 多yabo88滚球源合并输出图

可思yabo88滚球sykv.com,sykv.cn

描述: 可思yabo88滚球sykv.com

你的管道可以从一个或多个源读取或输入。如果你的管道从多个源读取并且这些源中的yabo88滚球相关联,则将输入连接在一起会很有用。在上面的图 2-2-4 所示的示例中,管道从yabo88滚球库表中读取名称和地址,并从 Kafka 主题中读取名称和订单号。然后管道 CoGroupByKey 用于连接此信息,其中键是名称 ; 结果 PCollection 包含名称,地址和订单的所有组合。 可思yabo88滚球sykv.com,sykv.cn

示例代码: 可思yabo88滚球sykv.com

PCollection<>> userAddress = pipeline.apply(JdbcIO.<>>read()...);
PCollection<>> userOrder = pipeline.apply(KafkaIO.read()...);
final TupleTag addressTag = new TupleTag();
final TupleTag orderTag = new TupleTag();
// 将集合值合并到 CoGbkResult 集合中。
PCollection<>> joinedCollection =
KeyedPCollectionTuple.of(addressTag, userAddress)
.and(orderTag, userOrder)
.apply(CoGroupByKey.create());
joinedCollection.apply(...);
可思yabo88滚球-人工智能资讯平台sykv.com

<><><><>

管道的设计工具 可思yabo88滚球-www.sykv.cn,sykv.com

对于管道的设计不光用代码去实现,也可以用视图工具。现在存在的有两种一种是拓蓝公司出品叫 Talend Big Data Studio,另一种就是免费开源的视图设计工具 kettle-beam 。

可思yabo88滚球sykv.com

?

可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

? 内容来自可思yabo88滚球sykv.com

三.怎样创建你的管道 可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

Apache Beam 程序从头到尾就是处理yabo88滚球的管道。本小节使用 Apache Beam SDK 中的类构建管道,一个完整的 Apache Beam 管道构建流程如下:

可思yabo88滚球-人工智能资讯平台sykv.com

首先创建一个 Pipeline 对象。

可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

不管是yabo88滚球做任何操作,如“ 读取”或“ 创建”及转换都要为管道创建 PCollection 一个或多个的数 据集(PCollection****)。 可思yabo88滚球-AI,sykv.com智能驾驶,人脸识别,区块链,大yabo88滚球

在 Apache Beam 的管道中你可以对yabo88滚球集 PCollection 做任何操作,例如转换yabo88滚球格式,过滤,分组,分析或以其他方式处理yabo88滚球中的每一个元素。每个转换都会创建一个新输出yabo88滚球集 PCollection,当然你可以在处理完成之前进行做任何的转换处理。 可思yabo88滚球sykv.com,sykv.cn

把你认为最终处理完成的yabo88滚球集写或以其他方式输出最终的存储地方。 本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

最后运行管道。

可思yabo88滚球sykv.com,sykv.cn

创建管道对象

可思yabo88滚球-www.sykv.cn,sykv.com

每一个 Apache Beam 程序都会从创建管道(Pipeline)对象开始。 内容来自可思yabo88滚球sykv.com

在 Apache Beam SDK,每一个管道都是一个独立的实体,管道的yabo88滚球集也都封装着它的yabo88滚球和对应的yabo88滚球类型(在 Apache Beam 中有对应的yabo88滚球转换类型包)。最后把yabo88滚球进行用于各种转换操作。 可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

在创建的管道的时候需要设置管道选项 PipelineOptions,有两种创建方式第一种是无参数和一种有参数的。具体两种有什么不同呢? 无参数的可以在程序中指定相应的管道选项参数,如显示设置执行大yabo88滚球引擎参数。有参数的就可以在提交 Apache Beam jar 程序的时候进行用 Shell 脚本的方式后期设置管道对应的参数。 可思yabo88滚球sykv.com,sykv.cn

具体示例如下:

可思yabo88滚球-AI,sykv.com智能驾驶,人脸识别,区块链,大yabo88滚球

无参数

可思yabo88滚球sykv.com

// 首先定义管道的选项
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class); // 显示设置执行大yabo88滚球引擎
// 创建管道实体对象
Pipeline p = Pipeline.create(options);

可思yabo88滚球-人工智能资讯平台sykv.com

有参数 本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().create(); 可思yabo88滚球sykv.com

提交设置参数的格式如下: 可思yabo88滚球-人工智能资讯平台sykv.com

--

可思yabo88滚球-www.sykv.cn,sykv.com
可思yabo88滚球sykv.com,sykv.cn

<>

端到端的测试管道

内容来自可思yabo88滚球sykv.com

端到端的测试,主要针对输入端和输出端两端的测试。要测试整个管道,请执行以下操作:

可思yabo88滚球-AI,sykv.com智能驾驶,人脸识别,区块链,大yabo88滚球

创建一个 Beam 测试 SDK 中所提供的 TestPipeline 实例。 可思yabo88滚球-AI,sykv.com智能驾驶,人脸识别,区块链,大yabo88滚球

对于多步骤yabo88滚球流水线中的每个输入yabo88滚球源,创建相对应的静态(Static)测试yabo88滚球集。 可思yabo88滚球-www.sykv.cn,sykv.com

使用 Create Transform,将所有的这些静态测试yabo88滚球集转换成 PCollection 作为输入yabo88滚球集。 可思yabo88滚球sykv.com,sykv.cn

按照真实yabo88滚球流水线逻辑,调用所有的 Transforms 操作。

可思yabo88滚球-www.sykv.cn,sykv.com

在yabo88滚球流水线中所有应用到 Write Transform 的地方,都使用 PAssert 来替换这个 Write Transform,并且验证输出的结果是否我们期望的结果相匹配 可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

由于端到端测试跟单个 Pipeline 步骤相似就不在举示例代码。其实开发过程中本地调试打断点,写日志测试也是更快解决问题的一个办法。 可思yabo88滚球-www.sykv.cn,sykv.com

五. Apache Beam 的管道源码解析

可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

Apache Beam Pipeline 源码解析 可思yabo88滚球-www.sykv.cn,sykv.com

管道源代码主类是比较简单的,本文针对 Pipeline.java 进行解析。

可思yabo88滚球-www.sykv.cn,sykv.com

1. 定义管道参数及管道创建

可思yabo88滚球sykv.com,sykv.cn

在管道创建首先可以定义管道的选项,例如 Beam 作业程序的名称、唯一标识、运行引擎平台等,当然也可以提交引擎平台用命令指定也可以。然后实例化一个管道对象。 内容来自可思yabo88滚球sykv.com

源码示例如下:

可思yabo88滚球-人工智能资讯平台sykv.com

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

可思yabo88滚球sykv.com,sykv.cn

2. 读取yabo88滚球源 可思yabo88滚球sykv.com

读取要处理的yabo88滚球,有文本yabo88滚球,结构化yabo88滚球和非结构化yabo88滚球以及流yabo88滚球。作为yabo88滚球处理的源yabo88滚球。 可思yabo88滚球sykv.com,sykv.cn

源码示例如下:

内容来自可思yabo88滚球sykv.com

PCollection lines =
p.apply(TextIO.read().from("gs://bucket/dir/file*.txt"));
可思yabo88滚球-www.sykv.cn,sykv.com

3. 进行yabo88滚球处理操作 内容来自可思yabo88滚球sykv.com

在管道里面可以进行窗口操作、函数操作、原子操作以及 SQL 操作。

本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

yabo88滚球统计的源码示例: 可思yabo88滚球sykv.com,sykv.cn

PCollection<>> wordCounts =allLines
.apply(ParDo.of(new ExtractWords()))
.apply(new Count());

可思yabo88滚球-www.sykv.cn,sykv.com

<>

4. 输出结果及运行

内容来自可思yabo88滚球sykv.com

源代码示例:

内容来自可思yabo88滚球sykv.com

PCollection formattedWordCounts =
wordCounts.apply(ParDo.of(new FormatCounts()));
formattedWordCounts.apply(TextIO.write().to("gs://bucket/dir/counts.txt"));
p.run();
可思yabo88滚球-人工智能资讯平台sykv.com

六.管道实战案例 可思yabo88滚球-AI,sykv.com智能驾驶,人脸识别,区块链,大yabo88滚球

案例场景描述

可思yabo88滚球sykv.com

随着人工智能 的不断发展,AI Cloud 在银行加快落地,安防 AI 碎片化的应用场景遍地开花。本文结合银行营业网点的业务,介绍管道案例实战。

可思yabo88滚球-人工智能资讯平台sykv.com

以银行的员工脱离岗检测中的行为分析yabo88滚球预处理为例。我们去银行办理业务过程中,首先要取号,然后叫号。叫号提示会对接系统形成一条消息回传后台,但是有时候正常办理业务期间有柜台营业员出去,然后很久才回来。这个时候摄像头会根据柜台离岗时间自动 AI 行为分析生成报警处理。

可思yabo88滚球-www.sykv.cn,sykv.com

案例业务架构流程 可思yabo88滚球-www.sykv.cn,sykv.com

?

可思yabo88滚球-AI,sykv.com智能驾驶,人脸识别,区块链,大yabo88滚球

?

可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

叫号报警和行为分析报警产生的yabo88滚球通过营业网点进行上报。 可思yabo88滚球-人工智能资讯平台sykv.com

上传网关集群,网关集群进行转换消息格式压缩消息。 可思yabo88滚球sykv.com,sykv.cn

消息流入消息中心等待消费,消息中心再次起着消峰作用。

内容来自可思yabo88滚球sykv.com

用 Beam 管道的时间窗口特性、流合并处理特性进行消息消费处理

可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

消息进入大yabo88滚球实时分析处理平台处理应用消息。

本文来自可思yabo88滚球(sykv.com),转载请联系本站及注明出处

案例示例核心代码

可思yabo88滚球-AI,sykv.com智能驾驶,人脸识别,区块链,大yabo88滚球

1. 本案例为了节约阅读时间,采用静态yabo88滚球

可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

 // 创建管道工厂
PipelineOptions options = PipelineOptionsFactory.create();
// 显式指定 PipelineRunner:FlinkRunner 必须指定如果不制定则为本地
options.setRunner(DirectRunner.class); // 生产环境关闭
// options.setRunner(FlinkRunner.class); // 生成环境打开
Pipeline pipeline = Pipeline.create(options);// 设置相关管道
// 为了演示显示内存yabo88滚球集
// 叫号yabo88滚球
final List<>> txtnoticelist = Arrays.asList(KV.of("DS-2CD2326FWDA3-I", "101 号顾客请到 3 号柜台 "), KV.of("DS-2CD2T26FDWDA3-IS", "102 号顾客请到 1 号柜台 "),
KV.of("DS-2CD6984F-IHS", "103 号顾客请到 4 号柜台 "),
KV.of("DS-2CD7627HWD-LZS", "104 号顾客请到 2 号柜台 "));
//AI 行为分析消息
final List<>> aimessagelist = Arrays.asList(KV.of("DS-2CD2326FWDA3-I", 可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络
"CMOS 智能半球网络摄像机, 山东省济南市解放路支行 3 号柜,type=2,display_image=no"),
KV.of("DS-2CD2T26FDWDA3-IS", "CMOS 智能筒型网络摄像机, 山东省济南市甸柳庄支行 1 号柜台,type=2,display_image=no"),
KV.of("DS-2CD6984F-IHS", " 星光级全景拼接网络摄像机, 山东省济南市市中区支行 4 号柜台,type=2,display_image=no"),
KV.of("DS-2CD7627HWD-LZS", " 全结构化摄像机, 山东省济南市市中区支行 2 号柜台,type=2,display_image=no"));
PCollection<>> notice = pipeline.apply("CreateEmails", Create.of(txtnoticelist));
PCollection<>> message = pipeline.apply("CreatePhones", Create.of(aimessagelist));
final TupleTag noticeTag = new TupleTag<>();
final TupleTag messageTag = new TupleTag<>();
PCollection<>> results = KeyedPCollectionTuple.of(noticeTag, notice).and(messageTag, message).apply(CoGroupByKey.create());

内容来自可思yabo88滚球sykv.com

System.out.append(" 合并分组后的结果:\r");
PCollection contactLines = results.apply(ParDo.of(new DoFn<>, String>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
KV e = c.element();
String name = e.getKey();
Iterable emailsIter = e.getValue().getAll(noticeTag);
Iterable phonesIter = e.getValue().getAll(messageTag);
System.out.append("" + name + ";" + emailsIter + ";" + phonesIter + ";" + "\r");
}
}));
pipeline.run().waitUntilFinish();

可思yabo88滚球sykv.com,sykv.cn

<><><><><><>

2. 测试运行结果

可思yabo88滚球sykv.com

?

可思yabo88滚球-人工智能资讯平台sykv.com

可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

? 可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

源码地址: pipelineTest2_5.java 可思yabo88滚球sykv.com,sykv.cn

七.小结 内容来自可思yabo88滚球sykv.com

近几年随着 AloT 发展得如火如荼,其落地场景也遍地开花。loT 作为 AI 落地先锋,已经步入线下各行各业。本文以 Beam 管道的设计切入,重点对 Beam 管道设计工具和源码进行解析,最后结合银行金融行业对 AI 碎片化的场景进行yabo88滚球预处理的案例,帮助大家全面了解 Beam 管道。

可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com

作者介绍 可思yabo88滚球-AI,sykv.com人工智能,深度学习,机器学习,神经网络

张海涛,目前就职于海康威视云基础平台,负责海康威视在全国金融行业 AI 大yabo88滚球落地的基础架构设计和中间件的开发,专注 AI 大yabo88滚球方向。Apache Beam 中文社区发起人之一,如果想进一步了解最新 Apache Beam 和 ClickHouse 动态和技术研究成果,请加微信 cyrjkj 入群共同研究和运用。

可思yabo88滚球-yabo88滚球挖掘,智慧医疗,机器视觉,机器人sykv.com


转发量:

网友评论:

发表评论
请自觉遵守互联网相关的政策法规,严禁发布色情、暴力、反动的言论。
评价:
表情:
用户名: 验证码:点击我更换图片 匿名?

关于我们?? 免责声明?? 广告合作?? 版权声明?? 联系方式?? 原创投稿?? 网站地图??

Copyright?2005-2019 Sykv.com 可思yabo88滚球 版权所有 ?? ICP备案:京ICP备14056871号

人工智能资讯?? 人工智能资讯?? 人工智能资讯?? 人工智能资讯

?扫码入群
咨询反馈
扫码关注

微信公众号

返回顶部
关闭