写在前面的话
查看hive建表语句
CREATE TABLE `ods.log_info`(log_id string,log_type string,original_msg string,create_time bigint,update_time bigint)PARTITIONED BY (`d_date` string COMMENT '快照日',`pid` string COMMENT '渠道号')ROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
查看MapredParquetInputFormat类
protected MapredParquetInputFormat(final ParquetInputFormat<ArrayWritable> inputFormat) {this.realInput = inputFormat;vectorizedSelf = new VectorizedParquetInputFormat(inputFormat);}
查看ParquetInputFormat 类(getSplits 方法)
public static boolean isTaskSideMetaData(Configuration configuration) {return configuration.getBoolean("parquet.task.side.metadata", Boolean.TRUE);}
public List<InputSplit> getSplits(JobContext jobContext) throws IOException {Configuration configuration = ContextUtil.getConfiguration(jobContext);List<InputSplit> splits = new ArrayList();if (!isTaskSideMetaData(configuration)) { // configuration.getBoolean("parquet.task.side.metadata", Boolean.TRUE); 默认返回为Truesplits.addAll(this.getSplits(configuration, this.getFooters(jobContext)));return splits;} else {Iterator i$ = super.getSplits(jobContext).iterator(); // 调用父类的 getSplits 返回的是List<InputSplit> 这里取的是List<InputSplit> 的迭代器while(i$.hasNext()) {InputSplit split = (InputSplit)i$.next();Preconditions.checkArgument(split instanceof FileSplit, "Cannot wrap non-FileSplit: " + split);splits.add(ParquetInputSplit.from((FileSplit)split));}return splits;}}
public class ParquetInputFormat<T> extends FileInputFormat<Void, T>
进入父类FileInputFormat查看(getSplits)
获取切片最小 minSize
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
protected long getFormatMinSplitSize() {return 1;}
public static final String SPLIT_MINSIZE ="mapreduce.input.fileinputformat.split.minsize";public static long getMinSplitSize(JobContext job) {return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);}
计算切片最大maxSize
long maxSize = getMaxSplitSize(job);
public static final String SPLIT_MAXSIZE ="mapreduce.input.fileinputformat.split.maxsize";public static long getMaxSplitSize(JobContext context) {return context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE);}
找出该路径下的所有文件 listStatus(job)
循环获取该目录下的文件大小
判断文件是否可切 isSplitable(job, path)、
protected boolean isSplitable(JobContext context, Path filename) {return true; // 默认可切}
计算切片大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
computeSplitSize方法
protected long computeSplitSize(long blockSize, long minSize,long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));}
public List<InputSplit> getSplits(JobContext job) throws IOException {StopWatch sw = new StopWatch().start();// 计算最小值long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));// 计算最大值long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>();// 获取所有的文件会过滤掉以.和_开头的文件List<FileStatus> files = listStatus(job);//循环该目录下的所有文件for (FileStatus file: files) {// 处理文件信息Path path = file.getPath();long length = file.getLen();if (length != 0) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}// 判断文件是否可切 默认值 返回Trueif (isSplitable(job, path)) {long blockSize = file.getBlockSize();// 计算切片大小 Math.max(minSize, Math.min(maxSize, blockSize));long splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;// private static final double SPLIT_SLOP = 1.1; // 10% slop// 当文件大小/ splitSize >1.1时对改文件进行循环切片while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}// 剩下的都是一块if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitablesplits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //不可切d额时候直接整个加入//Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgenjob.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); // 设置输入的文件个数//public static final String NUM_INPUT_FILES = "mapreduce.input.fileinputformat.numinputfiles";sw.stop();if (LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size()+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));}return splits; // 返回所有的切片信息}
核心代码
计算最小值
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//由参数mapreduce.input.fileinputformat.split.minsize控制
计算最大值
long maxSize = getMaxSplitSize(job);//由参数mapreduce.input.fileinputformat.split.maxsize 控制
计算splitSize
Math.max(minSize, Math.min(maxSize, blockSize));
HDFS集群默认块大小 blockSize(这里是256M)
结论
增大Map阶段的分区数
set mapreduce.input.fileinputformat.split.maxsize=128000000;
减少Map计算的分区数
set mapreduce.input.fileinputformat.split.maxsize=512000000
测试
测试SQL:
set hive.support.quoted.identifiers=None;set spark.app.name="ods.log_info"insert overwrite table ods.log_info_temp partition(d_date= '2021-02-21',pid)select `(is_settled)?+.+`from ods.log_infowhere 1 > 0
结果如下:
set mapreduce.input.fileinputformat.split.maxsize=128000000;task个数为65

set mapreduce.input.fileinputformat.split.maxsize=256000000;task个数为65

set mapreduce.input.fileinputformat.split.maxsize=512000000;task个数为34

扫描二维码获取
更多精彩
IT民工超

点个在看你最好看
文章转载自趣说大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。





