注意:以下文档只适用于TOP接口,请谨慎使用!

文档中心 > 聚石塔

任务类型

更新时间:2018/06/11 访问次数:2510

DataWorks(数据工场,原大数据开发套件)中有 7种 类型的节点,分别适用于不同的使用场景。

任务类型

OPEN_MR 任务

OPEN_MR 任务用于在 MaxCompute 的 MapReduce 编程接口(Java API)基础上实现的数据处理程序的周期运行,使用示例请参见 创建 OPEN_MR 任务

MaxCompute 提供了 MapReduce 编程接口,您可以使用 MapReduce 提供的接口(Java API)编写 MapReduce 程序处理 MaxCompute 中的数据,并打包成为 JAR 等类型的资源文件上传到 DataWorks 中,然后配置 OPEN_MR 节点任务。

ODPS_MR 任务

MaxCompute 提供 MapReduce 编程接口,您可以使用 MapReduce 提供的接口(Java API)编写 MapReduce 程序处理 MaxCompute 中的数据,您可以通过创建 ODPS_MR 类型节点的方式在任务调度中使用,使用示例请参见 ODPS_MR 任务

ODPS_SQL 任务

ODPS_SQL 任务支持您直接在 Web 端编辑和维护 SQL 代码,并可方便地调试运行和协作开发。DataWorks 还支持代码内容的版本管理和上下游依赖自动解析等功能,使用示例请参见 新建任务

DataWorks 默认使用 MaxCompute 的 project 作为开发生产空间,因此 ODPS_SQL 节点的代码内容遵循 MaxCompute SQL 的语法。MaxCompute SQL 采用的是类似于 Hive 的语法,可以看作是标准 SQL 的子集,但不能因此简单地把 MaxCompute SQL 等价成一个数据库,它在很多方面并不具备数据库的特征,如事务、主键约束、索引等。

具体的 MaxCompute SQL 语法请参见 SQL 概要

数据同步任务

数据同步节点任务是阿里云数加平台对外提供的稳定高效、弹性伸缩的数据同步云服务。您通过数据同步节点可以轻松地将业务系统数据同步到 MaxCompute 上来。详情请参见 创建同步任务

机器学习任务

机器学习节点用来调用机器学习平台中构建的任务,并按照节点配置进行调度生产。详情请参见 机器学习任务

注意:

只有在机器学习平台创建并保存的实验,在 DataWorks 中的机器学习节点中才能选择该实验。

Shell 任务

Shell 节点支持标准的 Shell 语法,不支持交互式语法,详情请参见 Shell 任务

虚节点任务

虚拟节点属于控制类型节点,它不产生任何数据的空跑节点,常用于工作流统筹节点的根节点,虚节点任务详情请参见 虚节点任务

注意

工作流里最终输出表有多个分支输入表,且这些输入表没有依赖关系时便经常用到虚拟节点。

示例如下

输出表由 3 个数据同步任务导入的源表经过 ODPS_SQL 任务加工产出,这 3 个数据同步任务没有依赖关系,ODPS_SQL 任务需要依赖 3 个同步任务,则工作流如下图所示:

用一个虚拟节点作为工作流起始根节点,3 个数据同步任务依赖虚拟节点,ODPS_SQL 加工任务依赖 3 个同步任务 。

工作流任务

一个节点任务,可以完成一件事;一个工作流任务,可以完成一个流程。工作流任务是节点任务的集合,一个工作流任务中,最多可以创建 30个 节点任务。请根据您的业务需求,合理选择节点类型,组合完成一个工作流任务。

创建工作流任务

  1. 进入 数据开发 页面,单击 新建,选择 新建任务。如下图所示:

  2. 填写新建任务弹出框中的信息。如下图所示:

    1

    选择任务类型为 工作流任务,调度类型为 周期调度

  3. 单击 创建,即跳转到工作流设计器页面。

    您可在工作流设计器中根据自身的需求,创建对应的任务,任务类型请参见 任务类型

小贴士

工作流中的节点任务是会依次运行的。

如上:test_01运行完以后才会运行test_02,如果test_01失败了以后,test_02就不会运行了。

虚节点任务

虚拟节点属于控制类型节点,它是不产生任何数据的空跑节点,常用于工作流统筹节点的根节点。

注意

工作流中最终输出表有多个分支输入表,且这些输入表没有依赖关系时便经常用到虚拟节点。

示例如下

输出表由 3 个数据同步任务导入的源表经过 ODPS_SQL 任务加工产出,这 3 个数据同步任务没有依赖关系,ODPS_SQL 任务需要依赖 3 个同步任务,则工作流如下图所示:

用一个虚拟节点作为工作流起始根节点,3 个数据同步任务依赖虚拟节点,ODPS_SQL 加工任务依赖 3 个同步任务。

新建虚节点任务

  1. 进入 数据开发 页面,单击 新建,选择 新建任务

  2. 填写新建任务弹出框中的信息。如下图所示:

    1

    选择任务类型为 工作流任务,调度类型为 周期调度

  3. 单击 创建,即可跳转到工作流设计器页面。

  4. 双击 节点组件 中的 虚节点

    1

  5. 输入节点名后,单击 创建,得到如下虚节点。

运行虚节点任务

上一节创建了工作流 dataworks1 ,工作流中只有一个虚节点任务。

  1. 单击 测试运行

  2. 单击 周期任务运行提醒 弹出框中的 确定

  3. 单击 测试运行 弹出框中的 运行

查看任务运行情况

  1. 单击 工作流任务测试运行 弹出框中的 前往运维中心

    1

  2. 双击工作流名称,进入到工作流内。

    进入工作流后,可以看到工作流内节点的运行情况。

    1

  3. 选中 start 任务,右键单击 查看节点运行日志

    1

    任务日志提示:当前实例,没有产生日志信息

    1

    出现此情况的原因:虚节点任务不会真正的执行,等到虚节点运行的时候,便会直接被置为成功,所以虚节点没有日志信息。

数据同步任务

目前数据同步任务支持的数据源类型包括:MaxCompute、RDS(MySQL、SQL Server、PostgreSQL)、Oracle、FTP、AnalyticDB、OSS、DRDS,更多支持的数据源请参见 支持数据源类型

本文以 RDS 数据同步至 MaxCompute 为例,详细说明如何进行数据同步任务。

操作步骤

创建数据表

创建 MaxCompute 表的详细操作请参见 创建表

新建数据源

注意:

新建数据源需项目管理员角色才能够创建。

当 RDS 数据源测试连通性不通时,需要到自己的 RDS 上添加数据同步机器 IP 白名单:

11.192.97.82,11.192.98.76,10.152.69.0/24,10.153.136.0/24,10.143.32.0/24,120.27.160.26,10.46.67.156,120.27.160.81,10.46.64.81,121.43.110.160,10.117.39.238,121.43.112.137,10.117.28.203,118.178.84.74,10.27.63.41,118.178.56.228,10.27.63.60,118.178.59.233,10.27.63.38,118.178.142.154,10.27.63.15,100.64.0.0/8

注意:

若使用自定义资源组调度 RDS 的数据同步任务,必须把自定义资源组的机器 IP 也加到 RDS 的白名单中。

  1. 以开发者身份进入 阿里云数加平台 > DataWorks(数据工场) > 管理控制台 页面,单击项目操作栏中的 进入工作区

  2. 单击顶部菜单栏中的 数据集成,导航至 数据源 页面。

  3. 单击右上角的 新增数据源

  4. 在新增数据源弹出框中填写相关配置项。

    上图中的配置项具体说明如下:

    • 数据源类型:阿里云数据库(RDS)。

    • 数据源名称:由英文字母、数字、下划线组成且需以字符或下划线开头,长度不超过 30 个字符。

    • 数据源描述:对数据源的简单描述,不超过 1024 个字符。

    • RDS 实例 ID:该 MySQL 数据源的 RDS 实例 ID。

    • RDS 实例购买者 ID:该 MySQL 数据源的 RDS 实例购买者 ID。

      若选择 JDBC 形式来配置数据源,其 JDBC 连接信息,格式为:jdbc:mysql://IP:Port/database。

    • 数据库名:该数据源对应的数据库名。

    • 用户名/密码:数据库对应的用户名和密码。

  5. 单击 测试连通性

  6. 若测试连通性成功,单击 保存 即可。

    若测试连通性失败,请根据自身情况参见:ECS 上自建的数据库测试连通性失败 或 RDS 数据源测试连通性不通

    关于其他类型(MaxCompute、RDS、Oracle、FTP、AnalyticDB、OSS、DRDS)数据源的配置,详见 数据源配置

新建任务

  1. 单击 数据开发 页面工具栏中的 新建任务

  2. 填写新建任务弹出框中的各配置项。

    此处以节点任务为例,若节点需要每日自动调度运行,调度类型选择 周期调度,然后在节点属性中配置调度周期。

  3. 单击 创建

配置数据同步任务

同步任务节点包括 选择来源选择目标字段映射通道控制 四大配置项。

  1. 选择来源

    选择 数据源 和 数据表

    • 数据过滤:可参考相应的 SQL 语法填写 where 过滤语句(不需要填写 where 关键字),该过滤条件将作为增量同步的条件。

      注意:

      where 条件即针对源头数据筛选条件,根据指定的 column、table、where 条件拼接 SQL 进行数据抽取。利用 where 条件可进行全量同步和增量同步,具体说明如下:

      • 全量同步:

        第一次做数据导入时通常为全量导入,可不用设置 where 条件。

      • 增量同步:

        增量导入在实际业务场景中,往往会选择当天的数据进行同步,通常需要编写 where 条件语句,请先确认表中描述增量字段(时间戳)为哪一个。如 tableA 描述增量的字段为 create_time,那么在 where 条件中编写 create_time>${yesterday},在参数配置中为其参数赋值即可。其中更多内置参数的使用方法,请参见 系统调度参数

    若数据同步任务是 RDS/Oracle/MaxCompute,在该页面中会有切分键配置。

    • 切分键:只支持类型为整型的字段。读取数据时,根据配置的字段进行数据分片,实现并发读取,可提升数据同步效率。只有同步任务是 RDS/Oracle 数据导入至 MaxCompute 时,才显示切分键配置项。

      注意:

      若源头为 Mysql 数据源,则数据同步任务还支持分库分表模式的数据导入(前提是无论数据存储在同一数据库还是不同数据库,表结构必须是一致的)。

    分库分表可支持如下场景:

    • 同库多表:单击搜索表,添加需要同步的多张表即可。

    • 分库多表:首先单击添加选择源库,再单击搜索表来添加表。

  2. 选择目标

    单击 快速建表 可将源头表的建表语句转化为符合 MaxCompute SQL 语法规范的 DDL 语句新建目标表。选择后单击 下一步

    • 分区信息:分区是为了便于查询部分数据引入的特殊列,指定分区便于快速定位到需要的数据。此处的自定义参数是将昨天的日期做为这个分区的值,分区值支持常量和变量,更多自定义参数请参见 参数配置

    • 清理规则:

      • 写入前清理已有数据:导数据之前,清空表或者分区的所有数据,相当于 insert overwrite。

      • 写入前保留已有数据:导数据之前不清理任何数据,每次运行数据都是追加进去的,相当于 insert into。

    在参数配置中为参数赋值,如下图所示:

    1

  3. 映射字段

    需对字段映射关系进行配置,左侧 源头表字段 和右侧 目标表字段 为一一对应的关系。

    增加/删除:鼠标 Hover 上每一行,单击删除图标可以删除当前字段。单击 添加一行 可单个增加字段,当数据库类型是 MaxCompute 时,可以将分区列的列名,作为添加一行的值,这样可以在同步的时候,将分区列也同步过去。

    自定义变量和常量的写入方法:

    如果需要把常量或者变量导入 MaxCompute 中表的某个字段,只需要单击插入按钮,然后输入常量或者变量的值,并且用英文单引号包起来即可。如变量 ${yesterday},在参数配置组件配置给变量赋值如 yesterday=$[yyyymmdd]。具体时间参数请参见 系统调度参数

  4. 通道控制

    通道控制 用来配置作业速率上限和脏数据检查规则,如下图所示:

    • 作业速率上限:即配置当前数据同步任务速率,支持最大为 20MB/s(通道流量度量值是数据同步任务本身的度量值,不代表实际网卡流量)。

    • 作业并发数:作业并发数必须配置了切分建以后才有效。作业并发数的上限是作业速率的上限,比如说作业速率上限是 10M,作业并发数最大可以选择 10。

    • 当错误纪录数:写入 RDS、Oracle 时可用,即脏数据数量,超过所配置的个数时,该数据同步任务结束。

    注意:

    我们不建议作业并发数配置过大,作业并发数越大,所消耗的资源也越多,很有可能会导致您别的任务会产生等待资源的情况,影响其他任务运行。

  5. 预览保存

    完成以上配置后,单击 下一步 即可预览,如若无误,单击 保存。如下图所示:

提交数据同步任务,并测试工作流

  1. 单击顶部菜单栏中的 提交

  2. 提交成功后单击 测试运行

    注意:

    因为本示例中源表里 cratetime 有时间为 2017-01-04 ,而配置中用到调度时间参数 $[yyyy-mm-dd-1] 和 ${bdp.system.bizdate},为了能在测试的时候将 cratetime 赋值为 2017-01-04,目标表的分区值为 20170104,测试的时候业务时间要选择 2017-01-04。如下图所示:

  3. 测试任务触发成功后,单击 前往运维中心 即可查看任务进度。

  4. 查看同步数据。

OPEN MR

注意
OPEN_MR 不支持引用资源表,不支持多个 Reduce 等。

ODPS MR 比 OPEN MR 的功能更加强大,ODPS MR 支持添加更多的Jar包资源、表资源等;更好的结合ODPS MR的原生语法,无需在界面上进行过多配置,可直接使用Maxcompute MR的语法调用,只需在配置任务时将Jar包资源先引用即可,详细的ODPS MR节点配置,可参考 ODPS_MR建议优先使用ODPS MR

应用场景和数据说明

本示例将以经典的 WordCount 示例来介绍如何在阿里云大数据平台使用 MaxCompute MapReduce。 WordCount 示例的详细内容请参见 WordCount 示例

本示例涉及的数据表说明如下:

  • 输入数据表:wc_in 用于存储 word 列表。

  • 输出数据表:wc_out 用于存放通过 MR 程序处理后的结果集。

数据表准备

创建数据表

根据 创建表 中的操作新建表 wc_in、wc_out。

  1. CREATE TABLE wc_in (key STRING, value STRING) partitioned by (pt string );
  2. CREATE TABLE wc_out (key STRING, cnt BIGINT) partitioned by (pt string );

插入示例数据

为感知 OPEN MR 程序在大数据平台上运行的结果,需向输入表(wc_in 的分区 pt=20170101)中插入示例数据。

操作步骤

  1. 进入 数据开发 页面,导航至 新建 > 新建脚本文件

  2. 填写 新建脚本文件 弹出框中的各配置项,单击 提交。  

  3. 在 MaxCompute 代码编辑器中编写 MaxCompute SQL 并运行代码。更多 SQL 语法请参见 SQL 概要

    MaxCompute SQL 脚本如下所示:

    1. ---创建系统dual
    2. drop table if exists dual;
    3. create table dual(id bigint); --如project中不存在此伪表,则需创建并初始化数据
    4. ---向系统伪表初始化数据
    5. insert overwrite table dual select count(*)from dual;
    6. ---向输入表 wc_in 的分区 pt=20170101 插入示例数据
    7. insert overwrite table wc_in partition(pt=20170101) select * from (
    8. select 'project','val_pro' from dual
    9. union all
    10. select 'problem','val_pro' from dual
    11. union all
    12. select 'package','val_a' from dual
    13. union all
    14. select 'pad','val_a' from dual
    15. ) b;
  4. 编写查询语句查看已插入的示例数据,如下图所示:

    SQL日志

编写 MapReduce 程序

您在使用 OPEN_MR 节点前,需在本地基于 MaxCompute MapReduce 编程框架的 WordCount 示例代码,根据自身需求进行编写,然后打成 Jar 包,以资源的方式添加到大数据平台。MR 开发的相关内容请参见 大数据计算服务 MaxCompute 帮助文档。本示例代码详情请参见 WordCount.java 附件。

添加资源

无论是在 MaxCompute console 还是阿里云大数据平台中运行,都需要执行 Jar 命令运行。因此,先打包生成 WordCount.jar(可以通过 Eclipse 的 Export 功能打包,也可以通过 ant 或其他工具生成),再上传至 MaxCompute 资源。

操作步骤

  1. 进入 数据开发 页面的 资源管理 模块,右键单击目录选择 上传资源

  2. 填写 资源上传 弹出框的各配置项,注意勾选 上传为 ODPS 资源

  3. 单击 提交

创建 OPEN_MR 节点

新建的 MaxCompute MapReduce 程序以资源方式上传至 MaxCompute,现需新建 OPEN_MR 节点来调用执行。

操作步骤

  1. 进入 数据开发 页面,导航至 新建 > 新建脚本文件

  2. 填写 新建任务 弹出框的各配置项。

    配置项说明:

    • 任务名称:wordcount 示例。

    • 描述:wordcount 示例。

  3. 单击 创建

  4. 在 OPEN_MR 配置页面进行配置。

    配置项说明:

    • MRJar 包:必选项,即本节点需要运行的主 jar 资源包。
    • 资源:必填项,本节点需要运行的主 jar 资源以及调用到的其他资源列表。
    • 输入/输出表:本示例中用到的是本项目的分区表,且分区值为每日自动调度的业务日期,因此分区用变量(系统调度参数)表示 。

    参数配置,由于本示例分区用系统参数表示,没有用自定义变量,所以此处无需而外配置:

    参数配置

    注意:

    更多参数变量使用请参见 系统调度参数

  5. 单击 保存提交,切换到工作流的流程面板中,单击 测试运行

    注意:

    测试运行时由于示例表只有分区 pt=20170101 有数据,所以业务时间选择 2017-01-01,这样系统参数才会把输入/输出表的分区替换成 20170101。

    保存提交

    生成测试任务后,等待运行成功 。

查看结果

操作步骤

  1. 打开 wc_in 插入示例数据 脚本文件。

  2. 编写查询 MaxCompute SQL 代码。

  3. 单击 运行

    查看测试结果和预期是否一致。

SHELL任务

SHELL 任务支持标准 SHELL 语法,不支持交互性语法。SHELL 任务可以在默认资源组上运行,若需要访问 IP/域名,请在 项目管理-项目配置 下将 IP/域名添加到白名单中。

操作步骤

创建 SHELL 节点

  1. 进入数据开发页面,单击 新建,选择 新建任务

  2. 填写 新建任务 弹出框中各配置项。此示例中任务类型选择 节点任务,类型选择 SHELL,调度类型选择 周期调度。如下图所示:

  3. 填写完成后,单击 创建

编辑 SHELL 节点

若想在 SHELL 中调用 系统调度参数,如下所示:

SHELL 语句如下:

  1. echo "$1 $2 $3"

注意:

参数1 参数2 …..多个参数之间用空格分隔。更多系统调度参数的使用,请参见 调度属性配置 - 系统参数

测试 SHELL 任务

测试 SHELL 任务有两种方式:页面直接 运行 和 测试运行

两种运行方式的区别,如下所示:

  • 运行:在页面直接单击 运行 时,任务是执行在默认资源组上的;若任务运行时,需要准备运行环境,那么建议使用测试运行。

    比如:通过 SHELL 调用 pyodps 时,您就需要将 pyodps 需要的依赖给准备在您的机器上,将任务指定在您的机器上运行。

  • 测试运行:测试运行的任务会生成实例,支持在指定的资源组上运行;若任务运行时,需要准备运行环境,那么建议使用测试运行。

    注意:

    如何将任务运行在指定的资源组上呢?可以进入 运维中心-任务管理 页面,选中任务单击 修改资源组,这样任务便会运行在指定的机器上。

  • 运行

  1. 在页面单击 运行 后,会出现下图所示的提示:

  2. 单击 去设置,跳转到如下页面,单击 添加

    注意:

    在添加白名单时,可以输入域名/IP 和端口,添加完毕的 IP/域名,默认资源组可直接访问。

  3. 切换回 数据开发 页面,单击 运行,查看结果。

    注意:

    系统调度参数只有在调度系统中才会被替换,因为页面单击 运行 时,没有提交到调度系统中,所以 $3 被替换成了 0。

  • 测试运行

  1. 在页面单击 测试运行 后,会出现下图所示的提示:

    1

  2. 单击 确定 后,系统会检验您任务是否保存,且是否提交到调度系统;若未保存/提交,会弹出提示框,引导您保存/提交任务。

    1

  3. 单击 确定提交, 出现测试运行弹出框,选择业务日期:

    1

  4. 单击 运行,会提示您测试运行已经触发成功,可以前往运维中心查看进度。

    1

  5. 单击 前往运维中心,进入运维中心页面后,右键单击任务名,选择 查看节点运行日志

    日志信息如下:

    从日志中可以看出,系统调度参数已经被替换。

    注意:

    为什么选择的业务日期是 20170806 号,替换出的结果是 20170807 呢?其遵循的转换规则为:实际时间=业务日期+1。

应用场景

通过 SHELL 连接数据库

  • 若数据库是在阿里云上搭建的,且区域是华东2,那么需要将数据库对如下白名单开放,即可连接数据库。

    10.152.69.0/24,10.153.136.0/24,10.143.32.0/24,120.27.160.26,10.46.67.156,120.27.160.81,10.46.64.81,121.43.110.160,10.117.39.238,121.43.112.137,10.117.28.203,118.178.84.74,10.27.63.41,118.178.56.228,10.27.63.60,118.178.59.233,10.27.63.38,118.178.142.154,10.27.63.15,100.64.0.0/8

    注意:

    如果是在阿里云上搭建的数据库,但区域不是华东2,则建议使用外网或购买与数据库同区域的 ECS 做为调度资源,将该 SHELL 任务运行在自定义资源组上。

  • 若数据库是自己在本地搭建的,那么建议使用外网连接,且将数据库对上述白名单 IP 开放。

    注意:

    若使用自定义资源组运行 SHELL 任务,必须把自定义资源组的机器 IP 也加到上述白名单中。

     

     

FAQ

关于此文档暂时还没有FAQ
返回
顶部