Hadoop迁移MaxCompute神器之DataX-On-Hadoop使用指南

Hadoop迁移MaxCompute神器之DataX-On-Hadoop使用指南

首页游戏大全迷你DayZ2完整版更新时间:2024-05-11

DataX-On-Hadoop即使用hadoop的任务调度器,将DataX task(reader->Channel->Writer)调度到hadoop执行集群上执行。这样用户的Hadoop数据可以通过MR任务批量上传到MaxCompute、RDS等,不需要用户提前安装和部署DataX软件包,也不需要另外为DataX准备执行集群。但是可以享受到DataX已有的插件逻辑、流控限速、鲁棒重试等等。

1. DataX-On-Hadoop 运行方式

1.1 什么是DataX-On-Hadoop

DataX https://github.com/alibaba/DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、hdfs、Hive、OceanBase、HBase、OTS、MaxCompute 等各种异构数据源之间高效的数据同步功能。 DataX同步引擎内部实现了任务的切分、调度执行能力,DataX的执行不依赖Hadoop环境。

DataX-On-Hadoop是DataX针对Hadoop调度环境实现的版本,使用hadoop的任务调度器,将DataX task(Reader->Channel->Writer)调度到hadoop执行集群上执行。这样用户的hadoop数据可以通过MR任务批量上传到MaxCompute等,不需要用户提前安装和部署DataX软件包,也不需要另外为DataX准备执行集群。但是可以享受到DataX已有的插件逻辑、流控限速、鲁棒重试等等。

目前DataX-On-Hadoop支持将Hdfs中的数据上传到公共云MaxCompute当中。

1.2 如何运行DataX-On-Hadoop

运行DataX-On-Hadoop步骤如下:

./bin/hadoop jar datax-jar-with-dependencies.jar com.alibaba.datax.hdfs.odps.mr.HdfsToOdpsMRJob ./bvt_case/speed.json

本例子的Hdfs Reader 和Odps Writer配置信息如下:

{ "core": { "transport": { "channel": { "speed": { "byte": "-1", "record": "-1" } } } }, "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0 } }, "content": [ { "reader": { "name": "hdfsreader", "parameter": { "path": "/tmp/test_datax/big_data*", "defaultFS": "hdfs://localhost:9000", "column": [ { "index": 0, "type": "string" }, { "index": 1, "type": "string" } ], "fileType": "text", "encoding": "UTF-8", "fieldDelimiter": "," } }, "writer": { "name": "odpswriter", "parameter": { "project": "", "table": "", "partition": "pt=1,dt=2", "column": [ "id", "name" ], "accessId": "", "accessKey": "", "truncate": true, "odpsServer": "http://service.odps.aliyun.com/api", "tunnelServer": "http://dt.odps.aliyun.com", "accountType": "aliyun" } } } ] } }

1.3 DataX-On-Hadoop 任务高级配置参数

针对上面的例子,介绍几个性能、脏数据的参数:

作业级别的性能参数配置位置示例:

{ "core": { "transport": { "channel": { "speed": { "byte": "-1", "record": "-1" } } } }, "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0 } }, "content": [ { "reader": {}, "writer": {} } ] } }

另外,介绍几个变量替换、作业命名参数:

"path": "/tmp/test_datax/dt=${dt}/abc.txt"

任务执行时可以配置如下传参,使得一份配置代码可以多次使用:

./bin/hadoop jar datax-jar-with-dependencies.jar com.alibaba.datax.hdfs.odps.mr.HdfsToOdpsMRJob datax.json -p "-Ddt=20170427 -Dbizdate=123" -t hdfs_2_odps_mr

读写插件详细配置介绍,请见后续第2、3部分。

2. Hdfs 读取

2.1 快速介绍

Hdfs Reader提供了读取分布式文件系统数据存储的能力。在底层实现上,Hdfs Reader获取分布式文件系统上文件的数据,并转换为DataX传输协议传递给Writer。

Hdfs Reader实现了从Hadoop分布式文件系统Hdfs中读取文件数据并转为DataX协议的功能。textfile是Hive建表时默认使用的存储格式,数据不做压缩,本质上textfile就是以文本的形式将数据存放在hdfs中,对于DataX而言,Hdfs Reader实现上类比TxtFileReader,有诸多相似之处。orcfile,它的全名是Optimized Row Columnar file,是对RCFile做了优化。据官方文档介绍,这种文件格式可以提供一种高效的方法来存储Hive数据。Hdfs Reader利用Hive提供的OrcSerde类,读取解析orcfile文件的数据。目前Hdfs Reader支持的功能如下:

  1. 支持textfile、orcfile、rcfile、sequence file、csv和parquet格式的文件,且要求文件内容存放的是一张逻辑意义上的二维表。
  2. 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量。
  3. 支持递归读取、支持正则表达式("*"和"?")。
  4. 支持orcfile数据压缩,目前支持SNAPPY,ZLIB两种压缩方式。
  5. 支持sequence file数据压缩,目前支持lzo压缩方式。
  6. 多个File可以支持并发读取。
  7. csv类型支持压缩格式有:gzip、bz2、zip、lzo、lzo_deflate、snappy。

我们暂时不能做到:

  1. 单个File支持多线程并发读取,这里涉及到单个File内部切分算法。后续可以做到支持。

2.2 功能说明

2.2.1 配置样例

{ "core": { "transport": { "channel": { "speed": { "byte": "-1048576", "record": "-1" } } } }, "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0 } }, "content": [ { "reader": { "name": "hdfsreader", "parameter": { "path": "/tmp/test_datax/*", "defaultFS": "hdfs://localhost:9000", "column": [ { "index": 0, "type": "string" }, { "index": 1, "type": "string" } ], "fileType": "text", "encoding": "UTF-8", "fieldDelimiter": "," } }, "writer": {} } ] } }

2.2.2 参数说明

"column": ["*"]

{ "type": "long", "index": 0 //从本地文件文本第一列获取int字段 }, { "type": "string", "value": "alibaba" //HdfsReader内部生成alibaba的字符串字段作为当前字段 }

"csvReaderConfig":{ "safetySwitch": false, "skipEmptyRecords": false, "useTextQualifier": false }

"hadoopConfig":{ "dfs.nameservices": "testDfs", "dfs.ha.namenodes.testDfs": "namenode1,namenode2", "dfs.namenode.rpc-address.youkuDfs.namenode1": "", "dfs.namenode.rpc-address.youkuDfs.namenode2": "", "dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }

2.3 类型转换

2.3.1 RCFile

如果用户同步的hdfs文件是rcfile,由于rcfile底层存储的时候不同的数据类型存储方式不一样,而HdfsReader不支持对Hive元数据数据库进行访问查询,因此需要用户在column type里指定该column在hive表中的数据类型,比如该column是bigint型。那么type就写为bigint,如果是DOUBLE型,则填写double,如果是float型,则填写float。注意:如果是varchar或者char类型,则需要填写字节数,比如varchar(255),char(30)等,跟hive表中该字段的类型保持一致,或者也可以填写string类型。

如果column配置的是*,会读取所有column,那么datax会默认以string类型读取所有column,此时要求column中的类型只能为String,CHAR,VARCHAR中的一种。

RCFile中的类型默认会转成DataX支持的内部类型,对照表如下:

RCFile在Hive表中的数据类型 DataX 内部类型 TINYINT,SMALLINT,INT,BIGINT Long FLOAT,DOUBLE,DECIMAL Double String,CHAR,VARCHAR String BOOLEAN Boolean Date,TIMESTAMP Date Binary Binary 2.3.2 ParquetFile

如果column配置的是*, 会读取所有列; 此时Datax会默认以String类型读取所有列. 如果列中出现Double等类型的话, 全部将转换为String类型。如果column配置读取特定的列的话, DataX中的类型和Parquet文件类型的对应关系如下:

Parquet格式文件的数据类型 DataX 内部类型 int32, int64, int96 Long float, double Double binary Binary boolean Boolean fixed_len_byte_array String textfile,orcfile,sequencefile:

由于textfile和orcfile文件表的元数据信息由Hive维护并存放在Hive自己维护的数据库(如mysql)中,目前HdfsReader不支持对Hive元数据数据库进行访问查询,因此用户在进行类型转换的时候,必须指定数据类型,如果用户配置的column为"*",则所有column默认转换为string类型。HdfsReader提供了类型转换的建议表如下:

DataX 内部类型 Hive表 数据类型 Long TINYINT,SMALLINT,INT,BIGINT Double FLOAT,DOUBLE String String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY Boolean BOOLEAN Date Date,TIMESTAMP 其中:

特别提醒:

2.4 按分区读取

Hive在建表的时候,可以指定分区partition,例如创建分区partition(day="20150820",hour="09"),对应的hdfs文件系统中,相应的表的目录下则会多出/20150820和/09两个目录,且/20150820是/09的父目录。了解了分区都会列成相应的目录结构,在按照某个分区读取某个表所有数据时,则只需配置好json中path的值即可。

比如需要读取表名叫mytable01下分区day为20150820这一天的所有数据,则配置如下:

"path": "/user/hive/warehouse/mytable01/20150820/*"


3. MaxCompute写入

3.1 快速介绍

ODPSWriter插件用于实现往ODPS(即MaxCompute)插入或者更新数据,主要提供给etl开发同学将业务数据导入MaxCompute,适合于TB,GB数量级的数据传输。在底层实现上,根据你配置的 项目 / 表 / 分区 / 表字段 等信息,通过 Tunnel写入 MaxCompute 中。支持MaxCompute中以下数据类型:BIGINT、DOUBLE、STRING、DATATIME、BOOLEAN。下面列出ODPSWriter针对MaxCompute类型转换列表:

DataX 内部类型 MaxCompute 数据类型 Long bigint Double double String string Date datetime Boolean bool 3.2 实现原理

在底层实现上,ODPSWriter是通过MaxCompute Tunnel写入MaxCompute系统的,有关MaxCompute的更多技术细节请参看 MaxCompute主站: https://www.aliyun.com/product/odps

3.3 功能说明

3.3.1 配置样例

{ "core": { "transport": { "channel": { "speed": { "byte": "-1048576", "record": "-1" } } } }, "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0 } }, "content": [ { "reader": {}, "writer": { "name": "odpswriter", "parameter": { "project": "", "table": "", "partition": "pt=1,dt=2", "column": [ "col1", "col2" ], "accessId": "", "accessKey": "", "truncate": true, "odpsServer": "http://service.odps.aliyun.com/api", "tunnelServer": "http://dt.odps.aliyun.com", "accountType": "aliyun" } } } ] } }

3.3.2 参数说明

作者:隐林

查看全文
大家还看了
也许喜欢
更多游戏

Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved