暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

大数据知识点全面盘点笔记(一)

大数据真有意思 2021-03-21
328

点击关注上方“知了小巷”,

设为“置顶或星标”,第一时间送达干货。

一、Linux & Shell
二、Hadoop
三、Zookeeper
四、Flume
五、Kafka
六、Hive
七、Sqoop
八、Azkaban

一、Linux & Shell

  1. 列一下比较熟悉的常用命令
df -h 
find 
ps -ef
iotop
free
tree
top
lsof -i

  1. 查看端口号、查看进程、查看磁盘空间
netstat
lsof -i
ps -ef
ps -aux
top
df -h
du --max-depth 1 -h

  1. 高级命令sed awk cut sort

大数据基础:Linux操作系统(下)

大数据基础:Linux操作系统(上)

大数据面试题十五 · Linux基础

  1. 单引号和双引号区别以及引号嵌套
单引号 ''; 双引号 ""
'$do_date' -> $do_date
"$do_date" -> 2020-11-16

"'$do_date'" -> '2020-11-16'
'"$do_date"' -> "$do_date"

  1. union与union all区别(和SQL语义一致)

union 去重
union all 不去重

  1. 进程号未知,如何一条命令退出进程

比如flume

ps -ef  | grep  flume | -V grep | awk | xargs kill -9

  1. 写过哪些Shell脚本
  • 启动停止、分发
#!/bin/bash

case $1 in
"start") {
 for i in hadoop101 hadoop102 hadoop103 hadoop104 hadoop105 hadoop106
 do
  # ssh $i "绝对路径 启动命令"
 done
};;
"stop"){
 for i in hadoop101 hadoop102 hadoop103 hadoop104 hadoop105 hadoop106
 do
  # ssh $i "绝对路径 停止命令"
 done
};;
# case命令可类比C语言的switch/case语句
# esac表示case语句块的结束
esac

  • 数仓层级内部ods->...->ads
#!/bin/bash

# 定义变量
# 获取时间(传进来一个日期,或者当前日期减1)

# 一些公用变量定义到外面,比如库名、项目名、时间、自定义函数加上库名等
# 编写 sql = ""

# 执行sql 比如 hive -e ...
# 也可以单独把sql写到模板文件里面,使用sed命令给sql文件赋值变量,再执行
# sed -i "s|paramsdaystr|$daystr|g" xxx.sql
# 比如 impala-shell -i ip -f xxx.sql

# 为lzo压缩文件手动添加索引

  • 数仓项目和关系存储(比如MySQL)的导入导出,比如sqoop datax

二、Hadoop

  1. Hadoop基础
  • 端口号
    版本:2.x 3.x
版本HDFS-WebYARN-WEB历史服务器客户端访问端口
2.x500708088198889000/8020
3.x98708088198889000/8020
  • 配置文件及配置项
    版本:2.x 3.x
    环境变量可直接配成系统的,/etc/profile/my_env.sh
2.xhdfs-site.xmlcore-site.xmlyarn-site.xmlmapred-site.xmlslaves
3.xhdfs-site.xmlcore-site.xmlyarn-site.xmlmapred-site.xmlworkers
  1. HDFS

HDFS基础面试题总结

  • HDFS读写流程
  • 小文件有哪些危害
    (1)占用NameNode内存
    (2)增加MapTask数量,消耗过多资源
  • 怎么解决小文件问题
    (1)har归档、自定义InputFormat(很少使用了)
    (2)CombineTextInputFormat
    将大量小文件合并到一起,统一做切片 (3)JVM重用
    任务启动和结束时间比任务运行时间还要多,或者叠加耗时较长,避免多次启动和关闭所花费的时间;
    JVM重用可以使得JVM实例在同一个job中重新使用N次,N的值在mapred-site.xml中配置:
    mapreduce.job.jvm.numtasks (10~20;-1表示不限制N)
  • HDFS副本数,默认3
  • 数据块大小
版本或场景块大小
1.x64M
2.x128M
3.x128M
2.x/3.x本地32M
Hive默认256M
业界128M/256M
  1. MapReduce

MapReduce和YARN基础面试题总结

Shuffle及其优化

3.1. Shuffle的定义

在MapReduce中,map方法之后,reduce方法之前,混洗的过程就是Shuffle。
本质是group by对数据进行分组(聚合)。

3.2. Map

  • Map阶段,两次排序(快排 + 归并)
  • getPartition(标记数据是哪个分区):自定义分区器,尽可能将数据打散,避免数据倾斜;
  • 环形缓冲区,内存-反向溢写(默认100M,可以调到200M):默认80%(可以调到90%~95%);
  • 溢写:对Key的索引按照字典顺序进行快速排序,产生大量的溢写文件,然后对指定分区的数据进行归并(归并排序),最后持久化到磁盘,等待Reduce端来拉取数据;
  • 在不影响最终业务逻辑的前提下,可以对溢写文件做一个Combiner;默认一次归并10个文件,服务器内存性能比较好,可以提高到15~20;
  • 在落盘之前,可以对数据进行压缩,减少磁盘和后续的网络IO;采用Snappy或Lzo压缩算法。

对于压缩:

  • 数据量小,仍然考虑快
  • 数据量大,考虑切片Split(Bzip2和Lzop-要创建索引)

3.3. Reduce

  • 去Map端拉取指定分区的数据,放到内存,内存不够溢写到磁盘(也会做归并),进入到Reduce方法;默认是一次拉5个MapTask的分区数据(可以调整到10~20);
  • 内存,增加Reduce内存,归并 reduce方法,还可以做分组排序;

压缩,看实际需求:

  • 如果数据需要永久保存,压缩的就越小越好;
  • 作为下一个MR的输入:看上面的Map压缩。

3.4. 内存配置

生产服务器:128G;

NodeManager默认堆内存大小:8G 可调整为100G左右;
单个任务(Job)默认内存:8G;
MapTask默认内存:1G(注意MapTask堆内存的配置);
ReduceTask默认内存:1G(注意ReduceTask堆内存的配置)。

处理128M数据,需要1G内存;处理1G数据,需要8G内存;处理2G数据,需要16G内存...

数据是否支持切片,如果不支持切片,则内存需要往上调;默认如果1G数据不支持切片,需要4~6G内存。
还有就是总内存与堆内存,不同的配置项。

3.5. 工作线程池大小

dfs.namenode.handler.count=20*math.loge(8)

比如集群规模为8台,工作线程池大小设置为41左右:

int(20*math.loge(8)) = 41

3.6 MapReduce中,分片、分区、排序和分组(Group)的关系

分片的数量等于启动的MapTask的数量。默认情况下,分片的大小就是HDFS的blockSize;
分区是分割map每个节点的结果,按照key分别映射给不同的reduce。分区的设置需要与ReduceTaskNum配合使用。比如想要得到5个分区的数据结果。那么就得设置5个ReduceTask。另外,几个分区数,就会产生几个结果文件。

  1. YARN

(1)YARN调度流程

(2)YARN调度策略 调度器:FIFO,公平,容量;
默认调度器:Apache->容量调度器;CDH->公平调度器;
(3)FIFO调度器特点:
单个队列,先进先出,同一时间只有一个APP执行。
(4)公平调度器特点:
多个队列,队列中每个APP公平享有队列资源,并发度是最高的;
队列之间也可以相互整合资源。
(5)容量调度器特点:
多个队列,每个队列分配相应的资源;底层是FIFO调度器;
一个队列中,优先满足先进入的APP;
队列之间可以相互整合(借用和抢占)资源。
(6)业界选择:并发度较高,选公平;普通企业,并发度不高,选容量。
(7)容量调度器默认只有一个default队列
按照框架/执行引擎创建队列:Hive Spark Flink;
按照业务模块或场景:登录注册、购物、物流模块;
好处:防止递归死循环把整个集群资源全部被耗尽;多队列相互解耦,降低风险;数据量特别大,无法保证所有任务都能正常完成,需要降级运行。

  1. 大数据集群基准测试

需要对HDFS读写性能和MR计算能力做测试,测试相关jar包在hadoop的share文件夹下面。

10T数据,需要花多长时间上传到HDFS;100T呢...读呢

  1. Hadoop宕机
  • MR造成,控制YARN同时运行的APP,每个APP申请的最大内存
    yarn.scheduler.maximum-allocation-mb(默认8GB)
  • 写文件块造成NameNode宕机。控制数据写入速度。
  1. 数据倾斜

大量数据进入同一个Reduce。

  • Map端提前聚合,进行Combiner,减少数据量传输

  • 导致数据倾斜的Key,大量分布在不同的Mapper
    (1)局部聚合(随机打散) + 全局聚合(去掉随机前缀)
    (2)增加Reduce数量,提升并行度
    (3)实现自定义分区(散列函数)

  • 空值处理
    null +1
    null +2
    null +3

  1. 集群资源分配参数

假如有30台机器,跑MR任务的时候,发现5个Map任务全部分配到了同一台机器上:
yarn.scheduler.fail.assignmultiple
 需要关掉。

可以在一个节点上同时开启20个容器(考虑节点距离)。

三、Zookeeper

  1. ZK选举
    半数机制

  2. 常用命令
    ls create delete get

  3. 安装多少台
    10台服务器,安装 3;
    20台服务器,安装 5;
    50台 7,100台 11(不再增加);
    台数多,通信时间变长,同步数据耗时,可用性会受到影响。

  4. Paxos算法

  5. CAP (ZK是CP,强一致,分区容错)

四、Flume

  1. Flume组成
  • taildir source
    (1)断电续传、多目录
    (2)版本:apache->1.7;CDH->1.6
    (3)没有taildir source,怎么实现断点续传,自定义source
    (4)原理:读取文件,偏移量offset
    (5)Flume挂了怎么办?
    有可能导致数据重复、但是不会丢失数据。
    (6)处理重复数据
    不处理:概率小,造成的负面影响可以忽略和接受;
    处理:自定义source,实现事务(不建议);下游处理:hive的dwd层等去重,比如group by或开窗取第一条。
    (7)是否支持递归:不支持
    自定义实现。递归遍历文件夹 + taildir
  • channel
    (1)file Channel 基于磁盘,可靠性高、性能低
    (2)memory Channel 基于内存,可靠性低、性能高
    (3)kafka Channel 数据存储在kafka集群(基于磁盘),可靠性高、性能优于memory Channel + kafka sink
    kafka Channel版本:apache->1.6(有BUG 头+内容;parseAsFlumeEvent不起作用),1.7版本解决掉了这个BUG
    (4)生产选择:下游是kafka,优先选择kafka Channel;如果下游不是kafka,金融类公司,选择file Channel;普通日志,选择memory Channel
  • hdfs sink
    小文件;文件大小,128M;时间:1~2小时;event个数:每个event大小不一样,文件块有大有小,不利于后期计算
  1. Flume拦截器、选择器、监控器
  • 拦截器 
    (1)常用到的拦截器
    ETL拦截器:数据清洗;或者下游处理。
    Flume更换为日志数据本身的时间。
    (2)自定义拦截器
    定义一个Java类,实现interceptor接口,重写4个方法;
    初始化、关闭、单event处理、多event处理;
    实现一个静态内部类Builder;
    打包上传到flume/lib目录下面;
    在配置文件中关联到 自定义类的全限定名+$builder。

  • 选择器
    replicating 默认选择器:把数据发送到下游所有的Channel;
    multiplexing 多路复用选择器:选择性发往自己指定的Channel。

  • 监控器
    ganglia:尝试提交次数、最终成功的次数,如果差值比较大,说明发生了大量重试,flume性能需要优化。
    (1)提高内存,默认2000M(2G) flume-env.sh 提高到4~6G (2)增加flume节点数 日志服务器配置:4~8、16G(业务应用)

Flume优化:

  • file Channel 配置多目录(挂载多块磁盘),提高吞吐量
  • 调整内存大小;扩展flume节点数

Flume节点挂了怎么办?
(1)memory Channel有风险,可能会丢失数据,默认存储100个Event;最多丢失100个;File Channel默认存储100万个Event
(2)source有可能会重复数据,可以在下游做去重处理

五、Kafka

  1. Kafka基础
  • Producer Broker Consumer Zookeeper
  • ZK:没有Producer信息;存储了Broker相关信息;存储了Consumer相关信息(高低版本有差异)
  • 生产环境多少台Broker节点:一般3台节点(2 * n + 1)足够、压测 + 动态调整确认;
    n是生产者峰值的生产速率 * 副本 100;峰值50MB/秒的时候是3台服务器;
  • 压测:生产者峰值的生产速率、消费者消费的峰值消费速率
  • TopicPartition副本:2~3,2个的居多,默认是1个副本
    副本越多,可靠性越高;通信效率会降低
  • Kafka的数据量
    100万日活、1人每天100条、1条日志0.5K~2K之间;
    平均速率:100万 * 100条 (3600 * 24) = 1150条/秒;
    每秒1MB/秒;
    一天当中,哪个时间段,数据峰值最高,和业务本身高度相关,比如20M<50M。
  • Kafka默认数据保存7天时间;或者保存3天。不同数据设置不同的保存时间,比如实时数仓(6个小时)。
  • 磁盘空间预留多少
    100G日志量 * 副本数2 * 3天 0.7 (保留30%磁盘余量)
  • Kafka监控 Kafka Eagle:开源
  • 分区数 一般3~10个 设置1个分区,然后做压测,得到生产者和消费者数据速率tp和tc;
    期望吞吐量为t,并发度-分区数;
    分区数 = t min(tp, tc);
    假如100MB/秒,tp是20MB/秒,tc是50MB/秒;
    分区数 = 100 20 = 5个分区。
  • 分区分配策略(消费者) RangeAssignor(The range assignor works on a per-topic basis);
    3个KafkaConsumer实例消费10个分区:
    0 1 2 3
    4 5 6
    7 8 9
    RoundRobinAssignor(uniformly distributed);
    注意可能导致数据倾斜的情况;Hash打散 + 轮询消费,减少数据倾斜。
  • ISR
    主要是解决:Leader挂掉了,下一个Leader是谁?在ISR中的都有机会称为新的Leader。
    延迟时间、延迟条数(旧版本);新版本:延迟时间。
  • 有多少个Topic flume/canal -> kafka;满足下游所有消费者(比如:spark hive kylin flink);
    业务模块或场景划分,避免重复解析。
  1. Kafka Broker挂掉

上下游影响,上游积压,下游空跑。
flume:有Channel,可以缓冲数据;
日志服务器,有30天的数据。

  1. 消息丢失

ack配置:

  • 0,只发送,不管发送结果,效率最高,可靠性差
  • 1,发送,1个Leader应答,效率中等,可靠性中等
  • -1,发送,Leader和Follower都应答,效率最低、可靠性最高

生产选择:
金融类公司:-1;普通日志:选择1

  1. 消息重复

事务、幂等性 + ack=-1;交给下游处理(hive dwd;spark streaming等);
事务:单分区、单会话(Session);
金融公司:最好不要使用Kafka,可以使用RocketMQ;或者使用Kafka事务 + 幂等性。
普通日志:下游去重处理。

  1. 消息积压
  • 消费能力不足,增加TopicPartition数量,同时提升消费组的消费者数量,消费者数=KafkaConsumer实例数=分区数;消费者所在机器的CPU核数也要相应增加。
  • 加快消费者消费速度:下游数据处理及时能力有待提高,比如增加每批次拉取的数据量(batchSize)。
  1. Kafka优化
  • 日志保留策略:Kafka磁盘数据保存时间7天->3天
  • 副本调整为2个
  • 采用压缩
  • 调整内存:默认1G -> 4~6G;可以考虑增加Broker节点数
  1. Kafka高效读写数据
  • 集群、分区、顺序读写600M/秒;随机读写100M/秒
  • 零拷贝技术
  1. Kafka传输一条2M的日志会有什么问题?

Kafka对于消息体的大小默认是单条最大1M,如果消息大于1M,生产者无法将消息推送到Kafka,或者消费者无法去消费Kafka里面的数据,需要调整配置:

# broker可复制的消息的最大字节数,默认为1M
replica.fetch.max.bytes: 1048576 
# kafka会接收单个消息size的最大限制,默认1M左右
message.max.bytes: 1000012
# message.max.bytes必须小于replica.fetch.max.bytes
# 否则会导致replica之间同步数据失败

  1. Kafka过期数据清理

保证数据没有被引用(没人消费);日志清理策略:delete和compact(压缩)

# 启用删除或压缩策略
# 生产一般都是直接过期删除
log.cleanup.policy=delete/compact

  1. Kafka按照时间消费数据

可以根据时间拿到offset,再从对应offset开始消费

// partitionInfos
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();

// timestampsToSearch
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();

for (PartitionInfo partitionInfo : partitionInfos) {
    // topicPartitions
    topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
    // timestampsToSearch
    timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), fetchDataTime);
}

consumer.assign(topicPartitions);

Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);

for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
    offsetTimestamp = entry.getValue();
    if (offsetTimestamp != null) {
        int partition = entry.getKey().partition();
        long timestamp = offsetTimestamp.timestamp();
        long offset = offsetTimestamp.offset();      
        // 设置读取消息的偏移量
        consumer.seek(entry.getKey(), offset);
    }
}

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records) {
        String value = record.value();
        // ...
    }
}

  1. Kafka消费者拉取数据还是接收推送数据

拉取数据

  1. Kafka数据顺序性

分区内有序;分区之间无序

六、Hive

  1. Hive组成

默认derby数据库;一般存储在MySQL里面;
编译器 选择器 优化器 执行器;
默认引擎:MR 已过时,建议Tez、Spark;
MR引擎:统计周指标、月指标、年指标、数据量比较大
Tez引擎:即席查询,临时使用 测试 Spark引擎:每天的固定定时任务(基于内存和磁盘)

  1. Hive与MySQL的区别

hive:数据量大的时候快,擅长大数据量查询 mysql:数据量小的时候快,小数据量的增删改查

  1. 内部表、外部表

删除数据:元数据和原始数据;
内部表:元数据和原始数据都会被删掉;外部表:只删除元数据

在生产环境使用中,什么时候创建内部表、外部表?
绝大多数表都是外部表;自己使用的(临时)表一般使用内部表。

  1. 4个by

order by:全局排序,慎用,易于导致数据倾斜

sort by:排序

distribute by:分区
分区内排序,sort by , distribute by

cluster by:排序、分区 字段相同时,使用cluster by

  1. 系统函数
  • 日(date_add date_sub)、周(next_day)、月(date_format last_day);
  • 解析json(get_json_object);
  • 判空 nvl;
  • 多行边一行 collect_set() sum...;
  • 一行变多行 炸裂函数 explode()
  1. 自定义函数

UDF(一进一出:行)、UDTF(多进多出、一进多出)、UDAF(多进一出)
有系统函数可以解决,为什么还要用自定义函数??更加灵活、方便调试和定位错误(数据比较复杂,可以增加外部引用的jar包,比如地图经纬度计算、IP地址、json嵌套)

  • 自定义UDF的步骤:定义类继承UDF、重写evaluate方法;
  • 自定义UDTF的步骤:定义类继承GenericUDTF,重写三个方法 初始化(定义返回值名称、对返回值类型校验、对输入参数个数类型进行校验)、process(Object[0]=>jsonarray=>forward)、关闭资源close

打包并上传到HDFS路径下,在Hive客户端创建函数create function func_xxx...;变更jar包即可更新函数逻辑

  1. 窗口函数

rank

over topN

  1. Hive优化

17 or 30+

  • mapjoin默认打开,不要关闭掉
  • 行列过滤 join where => where join
  • 分区
  • 小文件处理 CombineHiveInputformat 减少切片Split,进而减少MapTask;
    JVM重用 减少JVM结束和启动时间;
    merge 在maponly任务中是默认打开的,将小于16M的文件合并到256M,如果是reduce任务,需要开启
  • 压缩
  • 列存储 加快查询速度
  • 提前Combiner 不能影响最终业务逻辑
  • 合理选用引擎 mr tez spark
  • 合理设置map个数和reduce个数 split = max(0, min(blocksize, Long.MAX)) 块大小
    128M 数据 => 1G 内存
  1. 数据倾斜

怎么判断发生了数据倾斜?怎么判断优化起到了效果?
Reduce Tasks for job_xxxxxxxxxx_mmmm
其中有一个任务卡主99%,占用时间最长

  • 大多数任务都已经执行完,个别1~2个任务没有执行完,卡在99%左右
  • 不同数据类型关联产生数据倾斜(越界,int join string)
    t_user#user_id int => t_log#user_id string
  • 空值分布 自定义分区,key 随机数或123...456

group by优于distinct(新版hive已优化);
mapjoin;
开启负载均衡,任务拆成两个阶段(set hive.groupby.skewindata=true);
设置reduce个数

// 倾斜案例
select xxx from t1 left join on t2 where t1.aid=t2.sid left join t3 on t1.uid=t3.uid

null量在千万级别,直接把null干掉;发现reduce只有一个,手动设置为800个;开启负载均衡,试试还是99%;数据类型,bigint vs string string,cast(t1.sid as string) = t2.sid,bigint超出了范围越界了(越界匹配,注意Hive新版本)。

  1. Hive字段分隔符默认\001,\t

sqoop \t,数据导入会发生异常;内容和分隔符要有规范要求,数据清洗掉。

MySQL元数据备份: 元数据丢掉了,整个数仓就挂掉了,Keepalived;
MySQL utf8超过字节数问题: 主要是表情符号 注意采用utf8mb4

七、Sqoop

  1. 在使用Sqoop过程中遇到哪些问题,怎么解决的?
  • 空值问题
    hive mysql null \N null \N <==> null 4个参数(官方文档)

  • 一致性问题

hive => mysql 4map相互之间没有交互,并行执行,有可能既有成功又有失败;
数仓是为老板做决策提供依据的。指标可以算不出来,但是不能算错。

  1. Sqoop每天向HDFS导入多少数据?

100万日活为例(目前交易量也上每日百万了):100万 * 1KB => 1G (SDC RDBMS => Kudu)

  1. 每天订单表、支付表数据多少?(确实公司机密)

30张表,平均也是百万行了...所以做了数据归档、分库分表

数据量大的表是平均值的 2~5倍之间,每天几百M

  1. Sqoop在导数据的时候,是够遇到过数据倾斜?

Hadoop(Hive) Spark Flink Sqoop

没遇到,但还是可能发生数据倾斜(底层MR),一般发生数据倾斜的场景...

rownum() 生成一个严格均匀分布的字段(etl_id),然后指定为分割字段;split-by:按照某一列来切分表的工作单元;num-mappers:启动N个map来并行导入数据,默认4个。

  1. Sqoop Parquet文件

列式存储 ==> mysql直接导会是乱码;使用临时表tmp行式存储、官网转换参数;ads层默认text即可。

  1. 每天什么时候执行Sqoop任务?一般执行多久

凌晨,一般00:30分 看具体数据表和数据量,一般40分钟~50分钟

  1. Sqoop参数

举一反三;基本的JDBC参数;数据源 & 数据目标;指定表或query(自定义查询);空值处理;map个数;分隔符默认\001。

八、Azkaban

  1. Azkaban中每天跑多少指标,什么时间开始执行?

DolphinScheduler同上,出现过什么问题...

正常每天100个左右,节假日、活动日 200个;
00:30开始执行,08点之前必须结束(万一完成不了,增加机器、减少分析的指标)

  1. 执行发生故障怎么办?

发送邮件、电话告警(第三方智能告警平台 睿象云 restful接口) 24小时有人值班

https://azkaban.github.io/

https://github.com/azkaban/azkaban/releases

通过VPN连上公司服务器 => 任务调度重试失败后手动重试、查看日志紧急排查是否存在特殊原因

猜你喜欢



点一下,代码无 Bug

文章转载自大数据真有意思,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论