注意:以下文档只适用于TOP接口,请谨慎使用!

文档中心 > 聚石塔

DataWorks(数据工场,原大数据开发套件)中有 7种 类型的节点,分别适用于不同的使用场景。


一、任务类型


1)OPEN_MR 任务

OPEN_MR 任务用于在 MaxCompute 的 MapReduce 编程接口(Java API)基础上实现的数据处理程序的周期运行,使用示例请参见 创建 OPEN_MR 任务

MaxCompute 提供了 MapReduce 编程接口,您可以使用 MapReduce 提供的接口(Java API)编写 MapReduce 程序处理 MaxCompute 中的数据,并打包成为 JAR 等类型的资源文件上传到 DataWorks 中,然后配置 OPEN_MR 节点任务。


2)ODPS_MR 任务

MaxCompute 提供 MapReduce 编程接口,您可以使用 MapReduce 提供的接口(Java API)编写 MapReduce 程序处理 MaxCompute 中的数据,您可以通过创建 ODPS_MR 类型节点的方式在任务调度中使用,使用示例请参见 ODPS_MR 任务


3)ODPS_SQL 任务

ODPS_SQL 任务支持您直接在 Web 端编辑和维护 SQL 代码,并可方便地调试运行和协作开发。DataWorks 还支持代码内容的版本管理和上下游依赖自动解析等功能,使用示例请参见 新建任务

DataWorks 默认使用 MaxCompute 的 project 作为开发生产空间,因此 ODPS_SQL 节点的代码内容遵循 MaxCompute SQL 的语法。MaxCompute SQL 采用的是类似于 Hive 的语法,可以看作是标准 SQL 的子集,但不能因此简单地把 MaxCompute SQL 等价成一个数据库,它在很多方面并不具备数据库的特征,如事务、主键约束、索引等。

具体的 MaxCompute SQL 语法请参见 SQL 概要


4)数据同步任务

数据同步节点任务是阿里云数加平台对外提供的稳定高效、弹性伸缩的数据同步云服务。您通过数据同步节点可以轻松地将业务系统数据同步到 MaxCompute 上来。详情请参见 创建同步任务


5)机器学习任务

机器学习节点用来调用机器学习平台中构建的任务,并按照节点配置进行调度生产。详情请参见 机器学习任务

注意:只有在机器学习平台创建并保存的实验,在 DataWorks 中的机器学习节点中才能选择该实验。

6)Shell 任务

Shell 节点支持标准的 Shell 语法,不支持交互式语法,详情请参见 Shell 任务


7)虚节点任务

虚拟节点属于控制类型节点,它不产生任何数据的空跑节点,常用于工作流统筹节点的根节点,虚节点任务详情请参见 虚节点任务

注意:工作流里最终输出表有多个分支输入表,且这些输入表没有依赖关系时便经常用到虚拟节点。


示例如下:

输出表由 3 个数据同步任务导入的源表经过 ODPS_SQL 任务加工产出,这 3 个数据同步任务没有依赖关系,ODPS_SQL 任务需要依赖 3 个同步任务,则工作流如下图所示:


image.png


用一个虚拟节点作为工作流起始根节点,3 个数据同步任务依赖虚拟节点,ODPS_SQL 加工任务依赖 3 个同步任务 。


二、工作流任务


一个节点任务,可以完成一件事;一个工作流任务,可以完成一个流程。工作流任务是节点任务的集合,一个工作流任务中,最多可以创建 30个 节点任务。请根据您的业务需求,合理选择节点类型,组合完成一个工作流任务。


1. 创建工作流任务


1)进入 数据开发 页面,单击 新建,选择 新建任务。如下图所示:


image.png


2)填写新建任务弹出框中的信息。如下图所示:


image.png


选择任务类型为 工作流任务,调度类型为 周期调度


3)单击 创建,即跳转到工作流设计器页面。


image.png


您可在工作流设计器中根据自身的需求,创建对应的任务,任务类型请参见 任务类型


2. 小贴士


工作流中的节点任务是会依次运行的。


image.png


如上:test_01运行完以后才会运行test_02,如果test_01失败了以后,test_02就不会运行了。


三、虚节点任务


虚拟节点属于控制类型节点,它是不产生任何数据的空跑节点,常用于工作流统筹节点的根节点。

注意:工作流中最终输出表有多个分支输入表,且这些输入表没有依赖关系时便经常用到虚拟节点。


示例如下:

输出表由 3 个数据同步任务导入的源表经过 ODPS_SQL 任务加工产出,这 3 个数据同步任务没有依赖关系,ODPS_SQL 任务需要依赖 3 个同步任务,则工作流如下图所示:


image.png


用一个虚拟节点作为工作流起始根节点,3 个数据同步任务依赖虚拟节点,ODPS_SQL 加工任务依赖 3 个同步任务。


1. 新建虚节点任务


1)进入 数据开发 页面,单击 新建,选择 新建任务


image.png


2)填写新建任务弹出框中的信息。如下图所示:


image.png


选择任务类型为 工作流任务,调度类型为 周期调度


3)单击 创建,即可跳转到工作流设计器页面。


image.png


4)双击 节点组件 中的 虚节点


image.png


5)输入节点名后,单击 创建,得到如下虚节点。


image.png


2. 运行虚节点任务


上一节创建了工作流 dataworks1 ,工作流中只有一个虚节点任务。

1)单击 测试运行


image.png


2)单击 周期任务运行提醒 弹出框中的 确定


image.png


3)单击 测试运行 弹出框中的 运行


image.png


3. 查看任务运行情况


1)单击 工作流任务测试运行 弹出框中的 前往运维中心


image.png


2)双击工作流名称,进入到工作流内。


image.png


进入工作流后,可以看到工作流内节点的运行情况。


image.png


3)选中 start 任务,右键单击 查看节点运行日志


image.png


任务日志提示:当前实例,没有产生日志信息


image.png


出现此情况的原因:虚节点任务不会真正的执行,等到虚节点运行的时候,便会直接被置为成功,所以虚节点没有日志信息。


四、数据同步任务


目前数据同步任务支持的数据源类型包括:MaxCompute、RDS(MySQL、SQL Server、PostgreSQL)、Oracle、FTP、AnalyticDB、OSS、DRDS,更多支持的数据源请参见 支持数据源类型


image.png


本文以 RDS 数据同步至 MaxCompute 为例,详细说明如何进行数据同步任务。


操作步骤

1)创建数据表

创建 MaxCompute 表的详细操作请参见 创建表


2)新建数据源

注意:新建数据源需项目管理员角色才能够创建。


当 RDS 数据源测试连通性不通时,需要到自己的 RDS 上添加数据同步机器 IP 白名单:

11.192.97.82,11.192.98.76,10.152.69.0/24,10.153.136.0/24,10.143.32.0/24,120.27.160.26,10.46.67.156,120.27.160.81,10.46.64.81,121.43.110.160,10.117.39.238,121.43.112.137,10.117.28.203,118.178.84.74,10.27.63.41,118.178.56.228,10.27.63.60,118.178.59.233,10.27.63.38,118.178.142.154,10.27.63.15,100.64.0.0/8


注意:若使用自定义资源组调度 RDS 的数据同步任务,必须把自定义资源组的机器 IP 也加到 RDS 的白名单中。


① 以开发者身份进入 阿里云数加平台 > DataWorks(数据工场) > 管理控制台 页面,单击项目操作栏中的 入工作区

② 单击顶部菜单栏中的 数据集成,导航至 数据源 页面。

③ 单击右上角的 新增数据源


image.png


④ 在新增数据源弹出框中填写相关配置项。


image.png


上图中的配置项具体说明如下:


配置项

说明

数据源类型

阿里云数据库(RDS)。

数据源名称

由英文字母、数字、下划线组成且需以字符或下划线开头,长度不超过 30 个字符。

数据源描述

对数据源的简单描述,不超过 1024 个字符。

RDS 实例 ID

该 MySQL 数据源的 RDS 实例 ID。

RDS 实例购买者 ID

该 MySQL 数据源的 RDS 实例购买者 ID。

若选择 JDBC 形式来配置数据源,其 JDBC 连接信息,格式为:jdbc:mysql://IP:Port/database。

数据库名

该数据源对应的数据库名。

用户名/密码

数据库对应的用户名和密码。


⑤ 单击 测试连通性

⑥ 若测试连通性成功,单击 保存 即可。
若测试连通性失败,请根据自身情况参见:ECS 上自建的数据库测试连通性失败RDS 数据源测试连通性不通
关于其他类型(MaxCompute、RDS、Oracle、FTP、AnalyticDB、OSS、DRDS)数据源的配置,详见 数据源配置


3)新建任务

① 单击 数据开发 页面工具栏中的 新建任务

② 填写新建任务弹出框中的各配置项。


image.png


此处以节点任务为例,若节点需要每日自动调度运行,调度类型选择 周期调度,然后在节点属性中配置调度周期。

③ 单击 创建


4)配置数据同步任务

同步任务节点包括 选择来源选择目标字段映射通道控制 四大配置项。

① 选择来源选择 数据源 数据表


image.png


数据过滤:可参考相应的 SQL 语法填写 where 过滤语句(不需要填写 where 关键字),该过滤条件将作为增量同步的条件。

注意:

where 条件即针对源头数据筛选条件,根据指定的 column、table、where 条件拼接 SQL 进行数据抽取。利用 where 条件可进行全量同步和增量同步,具体说明如下:

i)全量同步:
第一次做数据导入时通常为全量导入,可不用设置 where 条件。

ii)增量同步:
增量导入在实际业务场景中,往往会选择当天的数据进行同步,通常需要编写 where 条件语句,请先确认表中描述增量字段(时间戳)为哪一个。如 tableA 描述增量的字段为 create_time,那么在 where 条件中编写 create_time>${yesterday},在参数配置中为其参数赋值即可。其中更多内置参数的使用方法,请参见 系统调度参数


② 若数据同步任务是 RDS/Oracle/MaxCompute,在该页面中会有切分键配置。

切分键:只支持类型为整型的字段读取数据时,根据配置的字段进行数据分片,实现并发读取,可提升数据同步效率。只有同步任务是 RDS/Oracle 数据导入至 MaxCompute 时,才显示切分键配置项。

注意:若源头为 Mysql 数据源,则数据同步任务还支持分库分表模式的数据导入(前提是无论数据存储在同一数据库还是不同数据库,表结构必须是一致的)。


③ 分库分表可支持如下场景:

i)同库多表:单击搜索表,添加需要同步的多张表即可。

ii)分库多表:首先单击添加选择源库,再单击搜索表来添加表。

④ 选择目标

单击 快速建表 可将源头表的建表语句转化为符合 MaxCompute SQL 语法规范的 DDL 语句新建目标表。选择后单击 下一步


image.png


i)分区信息:分区是为了便于查询部分数据引入的特殊列,指定分区便于快速定位到需要的数据。此处的自定义参数是将昨天的日期做为这个分区的值,分区值支持常量和变量,更多自定义参数请参见 参数配置

ii)清理规则:

a. 写入前清理已有数据:导数据之前,清空表或者分区的所有数据,相当于 insert overwrite。

b. 写入前保留已有数据:导数据之前不清理任何数据,每次运行数据都是追加进去的,相当于 insert into。

⑤ 在参数配置中为参数赋值,如下图所示:


image.png


⑥ 映射字段
需对字段映射关系进行配置,左侧 源头表字段 和右侧 目标表字段 为一一对应的关系。


image.png


增加/删除:鼠标 Hover 上每一行,单击删除图标可以删除当前字段。单击 添加一行 可单个增加字段,当数据库类型是 MaxCompute 时,可以将分区列的列名,作为添加一行的值,这样可以在同步的时候,将分区列也同步过去。


自定义变量和常量的写入方法:

如果需要把常量或者变量导入 MaxCompute 中表的某个字段,只需要单击插入按钮,然后输入常量或者变量的值,并且用英文单引号包起来即可。如变量 ${yesterday},在参数配置组件配置给变量赋值如 yesterday=$[yyyymmdd]。具体时间参数请参见 系统调度参数


⑦ 通道控制

通道控制 用来配置作业速率上限和脏数据检查规则,如下图所示:


image.png


a. 作业速率上限:即配置当前数据同步任务速率,支持最大为 20MB/s(通道流量度量值是数据同步任务本身的度量值,不代表实际网卡流量)。

b. 作业并发数:作业并发数必须配置了切分建以后才有效。作业并发数的上限是作业速率的上限,比如说作业速率上限是 10M,作业并发数最大可以选择 10。

c. 当错误纪录数:写入 RDS、Oracle 时可用,即脏数据数量,超过所配置的个数时,该数据同步任务结束。

注意:我们不建议作业并发数配置过大,作业并发数越大,所消耗的资源也越多,很有可能会导致您别的任务会产生等待资源的情况,影响其他任务运行。


⑧ 预览保存
完成以上配置后,单击 下一步 即可预览,如若无误,单击 。如下图所示:


image.png


5)提交数据同步任务,并测试工作流

① 单击顶部菜单栏中的 提交

② 提交成功后单击 测试运行

注意:

因为本示例中源表里 cratetime 有时间为 2017-01-04 ,而配置中用到调度时间参数 $[yyyy-mm-dd-1] 和 ${bdp.system.bizdate},为了能在测试的时候将 cratetime 赋值为 2017-01-04,目标表的分区值为 20170104,测试的时候业务时间要选择 2017-01-04。如下图所示:


image.png


③ 测试任务触发成功后,单击 前往运维中心 即可查看任务进度。


image.png


④ 查看同步数据。


image.png


五、OPEN MR


注意:OPEN_MR 不支持引用资源表,不支持多个 Reduce 等。


ODPS MR 比 OPEN MR 的功能更加强大,ODPS MR 支持添加更多的Jar包资源、表资源等;更好的结合ODPS MR的原生语法,无需在界面上进行过多配置,可直接使用Maxcompute MR的语法调用,只需在配置任务时将Jar包资源先引用即可,详细的ODPS MR节点配置,可参考 ODPS_MR建议优先使用ODPS MR


1. 应用场景和数据说明


本示例将以经典的 WordCount 示例来介绍如何在阿里云大数据平台使用 MaxCompute MapReduce。 WordCount 示例的详细内容请参见 WordCount 示例

本示例涉及的数据表说明如下:

1)输入数据表:wc_in 用于存储 word 列表。

2)输出数据表:wc_out 用于存放通过 MR 程序处理后的结果集。


2. 数据表准备


1)创建数据表

根据 创建表 中的操作新建表 wc_in、wc_out。


CREATE TABLE wc_in (key STRING, value STRING) partitioned by (pt string );
CREATE TABLE wc_out (key STRING, cnt BIGINT) partitioned by (pt string );


2)插入示例数据

为感知 OPEN MR 程序在大数据平台上运行的结果,需向输入表(wc_in 的分区 pt=20170101)中插入示例数据。

操作步骤

① 进入 数据开发 页面,导航至 新建 > 新建脚本文件

② 填写 新建脚本文件 弹出框中的各配置项,单击 提交


image.png


③ 在 MaxCompute 代码编辑器中编写 MaxCompute SQL 并运行代码。更多 SQL 语法请参见 SQL 概要。MaxCompute SQL 脚本如下所示:


---创建系统dual
drop table if exists dual;
create table dual(id bigint); --如project中不存在此伪表,则需创建并初始化数据
---向系统伪表初始化数据
insert overwrite table dual select count(*)from dual;
---向输入表 wc_in 的分区 pt=20170101 插入示例数据
insert overwrite table wc_in partition(pt=20170101) select * from (
select 'project','val_pro' from dual
union all
select 'problem','val_pro' from dual
union all
select 'package','val_a' from dual
union all
select 'pad','val_a' from dual
) b;


④ 编写查询语句查看已插入的示例数据,如下图所示:


image.png


3. 编写 MapReduce 程序


您在使用 OPEN_MR 节点前,需在本地基于 MaxCompute MapReduce 编程框架的 WordCount 示例代码,根据自身需求进行编写,然后打成 Jar 包,以资源的方式添加到大数据平台。MR 开发的相关内容请参见 大数据计算服务 MaxCompute 帮助文档。本示例代码详情请参见 WordCount.java 附件。


4. 添加资源


无论是在 MaxCompute console 还是阿里云大数据平台中运行,都需要执行 Jar 命令运行。因此,先打包生成 WordCount.jar(可以通过 Eclipse 的 Export 功能打包,也可以通过 ant 或其他工具生成),再上传至 MaxCompute 资源。


操作步骤

① 进入 数据开发 页面的 资源管理 模块,右键单击目录选择 上传资源

② 填写 资源上传 弹出框的各配置项,注意勾选 上传为 ODPS 资源


image.png


③ 单击 提交


5. 创建 OPEN_MR 节点


新建的 MaxCompute MapReduce 程序以资源方式上传至 MaxCompute,现需新建 OPEN_MR 节点来调用执行。


操作步骤

① 进入 数据开发 页面,导航至 新建 > 新建脚本文件

② 填写 新建任务 弹出框的各配置项。


image.png


配置项说明:

i)任务名称:wordcount 示例。

ii)描述:wordcount 示例。

③ 单击 创建

④ 在 OPEN_MR 配置页面进行配置。


image.png


配置项说明:

i)MRJar 包:必选项,即本节点需要运行的主 jar 资源包。

ii)资源:必填项,本节点需要运行的主 jar 资源以及调用到的其他资源列表。

iii)输入/输出表:本示例中用到的是本项目的分区表,且分区值为每日自动调度的业务日期,因此分区用变量(系统调度参数)表示 。

⑤ 参数配置,由于本示例分区用系统参数表示,没有用自定义变量,所以此处无需而外配置:


image.png


注意:更多参数变量使用请参见 系统调度参数


⑥ 单击 保存提交,切换到工作流的流程面板中,单击 测试运行

注意:测试运行时由于示例表只有分区 pt=20170101 有数据,所以业务时间选择 2017-01-01,这样系统参数才会把输入/输出表的分区替换成 20170101。


image.png


image.png


⑦ 生成测试任务后,等待运行成功 。


6. 查看结果


操作步骤

① 打开 wc_in 插入示例数据 脚本文件。

② 编写查询 MaxCompute SQL 代码。

③ 单击 运行


image.png


查看测试结果和预期是否一致。


六、SHELL任务


SHELL 任务支持标准 SHELL 语法,不支持交互性语法。SHELL 任务可以在默认资源组上运行,若需要访问 IP/域名,请在 项目管理-项目配置 下将 IP/域名添加到白名单中。


1. 操作步骤

创建 SHELL 节点

① 进入数据开发页面,单击 新建,选择 新建任务


image.png


② 填写 新建任务 弹出框中各配置项。此示例中任务类型选择 节点任务,类型选择 SHELL,调度类型选择 周期调度。如下图所示:


image.png


③ 填写完成后,单击 创建

2. 编辑 SHELL 节点

若想在 SHELL 中调用 系统调度参数,如下所示:


image.png


SHELL 语句如下:

echo "$1 $2 $3"


注意:参数1 参数2 …..多个参数之间用空格分隔。更多系统调度参数的使用,请参见 调度属性配置 - 系统参数


3. 测试 SHELL 任务

测试 SHELL 任务有两种方式:页面直接 测试运行


image.png


两种运行方式的区别,如下所示:

i)运行:在页面直接单击 运行 时,任务是执行在默认资源组上的;若任务运行时,需要准备运行环境,那么建议使用测试运行。
比如:通过 SHELL 调用 pyodps 时,您就需要将 pyodps 需要的依赖给准备在您的机器上,将任务指定在您的机器上运行。

ii)测试运行:测试运行的任务会生成实例,支持在指定的资源组上运行;若任务运行时,需要准备运行环境,那么建议使用测试运行。


注意:如何将任务运行在指定的资源组上呢?可以进入 运维中心-任务管理 页面,选中任务单击 修改资源组,这样任务便会运行在指定的机器上。


image.png


1)运行

① 在页面单击 运行 后,会出现下图所示的提示:


image.png


② 单击 去设置,跳转到如下页面,单击 添加


image.png


注意:在添加白名单时,可以输入域名/IP 和端口,添加完毕的 IP/域名,默认资源组可直接访问。


③ 切换回 数据开发 页面,单击 运行,查看结果。


image.png


注意:系统调度参数只有在调度系统中才会被替换,因为页面单击 时,没有提交到调度系统中,所以 $3 被替换成了 0。


2)测试运行

① 在页面单击 测试运行 后,会出现下图所示的提示:


image.png


② 单击 确定 后,系统会检验您任务是否保存,且是否提交到调度系统;若未保存/提交,会弹出提示框,引导您保存/提交任务。


image.png


③ 单击 确定提交, 出现测试运行弹出框,选择业务日期:


image.png


④ 单击 运行,会提示您测试运行已经触发成功,可以前往运维中心查看进度。


image.png


⑤ 单击 前往运维中心,进入运维中心页面后,右键单击任务名,选择 查看节点运行日志


image.png


日志信息如下:


image.png


从日志中可以看出,系统调度参数已经被替换。

注意:为什么选择的业务日期是 20170806 号,替换出的结果是 20170807 呢?其遵循的转换规则为:实际时间=业务日期+1。


4. 应用场景

通过 SHELL 连接数据库

1)若数据库是在阿里云上搭建的,且区域是华东2,那么需要将数据库对如下白名单开放,即可连接数据库。
10.152.69.0/24,10.153.136.0/24,10.143.32.0/24,120.27.160.26,10.46.67.156,120.27.160.81,10.46.64.81,121.43.110.160,10.117.39.238,121.43.112.137,10.117.28.203,118.178.84.74,10.27.63.41,118.178.56.228,10.27.63.60,118.178.59.233,10.27.63.38,118.178.142.154,10.27.63.15,100.64.0.0/8


注意:如果是在阿里云上搭建的数据库,但区域不是华东2,则建议使用外网或购买与数据库同区域的 ECS 做为调度资源,将该 SHELL 任务运行在自定义资源组上。


2)若数据库是自己在本地搭建的,那么建议使用外网连接,且将数据库对上述白名单 IP 开放。


注意:若使用自定义资源组运行 SHELL 任务,必须把自定义资源组的机器 IP 也加到上述白名单中。

FAQ

关于此文档暂时还没有FAQ
返回
顶部