Distcp 跨集群同步

Distcp 跨集群同步

1. 使用方法及原理

1
2
hadoop distcp srcPath tarPath
会把srcPath和tarPath下的所有目录、文件信息放入到 _distcp_src_files 和 _distcp_dst_files 两个临时文件中,具体文件内容的拷贝工作交给多个map任务,会导致一个问题,就是文件过多那么map任务数就会很多,每个datanode有一个上限,极少数情况下可能会超过上限,导致数据文件拷贝不全。

org.apache.hadoop.tools.DistCp 类中会解析 srcPath tarPath ,将之前的临时文件先删除,然后一个含有随机数的临时文件夹路径

1
2
3
4
5
6
private Path createMetaFolderPath() throws Exception {
Configuration configuration = this.getConf();
Path stagingDir = JobSubmissionFiles.getStagingDir(new Cluster(configuration), configuration);
Path metaFolderPath = new Path(stagingDir, "_distcp" + String.valueOf(rand.nextInt()));
return metaFolderPath;
}

得到临时文件的路径,SequenceFile文件,即Key/Value结构的序列化文件,这个文件里将存放所有需要拷贝的源目录/文件信息列表。其中Key是源文件的Text格式的相对路径,即relPath;而Value则记录源文件的FileStatus格式的org.apache.hadoop.fs.FileStatus信息,这里FileStatus是hadoop已经封装好了的描述HDFS文件信息的类。

1
2
3
4
5
protected Path getFileListingPath() throws IOException {
String fileListPathStr = this.metaFolder + "/fileList.seq";
Path path = new Path(fileListPathStr);
return new Path(path.toUri().normalize().toString());
}

在createJob()里主要有两个地方需要注意

1
2
3
4
5
private Job createJob() throws IOException {
job.setInputFormatClass(DistCpUtils.getStrategy(this.getConf(), this.context));
job.setJarByClass(CopyMapper.class);
return job;
}

setInputFormatClass() 保证了mapper的数据读取格式是从getStrategy(getConf(), inputOptions)得到的,底层是由 UniformSizeInputFormat.class 类定义的,这个类继承自InputFormat.class,MR中所有的输入格式类都继承自InputFormat,这是一个抽象类。
InputFormat抽象类仅有两个抽象方法

  • ListgetSplits(),获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题
  • RecordReader<K,V>createRecordReader(),创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题

CopyMapper.class中则定义了每个map的工作逻辑,也就是拷贝的核心逻辑。这个类里最核心的两个方法是 setup()和map()。setup()中完成map方法的一些初始化工作,在DISTCP中,这个方法里会设定对端的目标路径,并做一些参数设置和判断工作

而 map(Text relPath, CopyListingFileStatus sourceFileStatus, Context context) 中通过参数可以发现就是对UniformSizeInputFormat类里分片后的数据里的每一行进行处理,每行里存放的就是 fileList.seq文件每行的内容。

CommonCliOptions 解析命令行参数

2. 几个问题

2.1 文件数量过多,导致map过多,超过datanode上限,导致数据文件拷贝不全

2.2 提示信息非常少,如果是 调度器 -> 作业 -> distcp -> mrJob, 此时输出的信息应该是 mrJob的,也就是distcp的信息,而不是作业的信息,可能会导致调度器无法拿到作业执行信息从而导致作业调度失败

2.3 长尾问题 ?

3. 分区表的跨集群同步问题

不管是普通的表还是分区表,都需要刷新元数据,invalidate metadata table xxx

分区表还需要手动添加分区,alter table xxx add if not exists partition(key1="value1", key2="value2") partition(key1="value3", key2="value4")

hive shell 通过 msck repair table xxx 可以自动去读取hdfs下文件信息,来添加元数据中不存在的分区信息,但是存在jdbc连接时不识别 msck 指令的情况

4. hive同步phoenix

官网地址

官网提供的一个方案是可以在hive创建表,然后存储上指定phoenix的一个方法,org.apache.phoenix.hive.PhoenixStorageHandler,通过一些配置来完成。但是会存在一些问题,比如hive与phoenix的数据类型转换问题,bigint、int;还有string的长度限制等

∴ 跨集群表同步(parquet格式) + hive同步phoenix的方案解决如下

  1. 首先将集群1的hive表通过元数据直接复制到集群2来创建
  2. 对于未分区的表,直接复制文件
1
2
3
FSDataInputStream input = batchClusterFS.open(path);
FSDataOutputStream output = queryClusterFS.create(new Path(queryHiveTablePath,fileName), true);
IOUtils.copyBytes(input, output,queryClusterFS.getConf(), true);

如果是分区的表(eg:impala),递归复制文件夹下的所有文件

1
2
3
4
5
6
7
8
Arrays.stream(files).parallel().forEach(dirPath -> {
try {
FileUtil.copy(batchClusterFS, dirPath.getPath(), queryClusterFS, new Path(queryHiveTable.getSd().getLocation()), false, queryClusterFS.getConf());
} catch (IOException e) {
logger.error(">> 分区表拷贝失败 dirPath: {}, error: {} ", dirPath, e);
table.setDistStatus(CrossClusterHiveTableCopy.FAIL_STATUS);
}
});

都需要刷新元数据信息

  1. 遍历集群2hive表对应的hdfs文件,需要将hive中字段的类型和phoenix类型进行转换,org.apache.phoenix.schema.types,比如hive的string转phoenix的varchar;hive的array转phoenix的VARCHAR[]。

如果一个字段长度可能会一直变,考虑到不能随便修改phphonenix的字段长度,也不能把字段长度设置的太大,可以将hive的字段设置为array,对应phoenix的VARCHAR[],这就是一个变长字符数组。
hive中的array类型,从hive元数据中可以查到以下信息:data_type=2003,column_size=0;对应phoenix的数据类型是VARCHAR[], column_size为null,是变长数组

1
2
3
4
5
6
7
8
9
10
11
这里的dataType是varchar[];PVarcharArray 是 org.apache.phoenix.schema.types.PVarcharArray,是phoenix的类型
if (dataType instanceof PVarcharArray) {
// org.apache.hadoop.io.ArrayWritable
ArrayWritable aw = (ArrayWritable) value;
ArrayList strArr = new ArrayList();
for (Writable wr : aw.get()) {
strArr.add(wr.toString());
}
Array array = phoenixConn.createArrayOf("VARCHAR", strArr.toArray());
JobLogger.log(">> type: varchar[] size {} ", strArr.size());
statement.setArray(i + 1, array);