之前工作中,有接触到大数据的需求,虽然当时我们体系有专门的大数据部门,但是由于当时我们中台重构,整个体系的开发量巨大,共用一个大数据部门,人手已经忙不过来,没法办,为了赶时间,我自己负责的系统的大数据相关操作,由我们自己承担了。此前对大数据的知识了解的很少,于是晚上回去花时间突击大数据知识,白天就开始上手干,一边学一边做,总算在部门规定的时间,跟系统一起上线了。后来的维护迭代就交给大数据去了,虽然接触大数据的时间不长,但是对我来说,确是很有意思的一段经历,觉得把当时匆匆学的知识点,再仔细回顾回顾,整理下。
01 大数据概述大数据: 就是对海量数据进行分析处理,得到一些有价值的信息,然后帮助企业做出判断和决策.
处理流程:
Hadoop是一个分布式系基础框架,它允许使用简单的编程模型跨大型计算机的大型数据集进行分布式处理.它主要解决两个问题
假设一个文件非常非常大,大小为1PB/a.txt, 大到世界上所有的高级计算机都存储不下, 怎么办?
2.3 问题三: 如何将这些计算任务跑在集群中?从一个网络日志文件中计算独立 IP, 以及其出现的次数
如果数据量特别大,我们可以将,整个任务拆开, 划分为比较小的任务, 从而进行计算呢。
如果能够在不同的节点上并行执行, 更有更大的提升, 如何把这些任务跑在集群中?
——分布式文件系统(GFS),可用于处理海量网页的存储
——分布式计算框架MapReduce,可用于处理海量网页的索引计算问题。
狭义上来说,hadoop就是单独指代hadoop这个软件,
HDFS :分布式文件系统
MapReduce : 分布式计算系统
广义上来说,hadoop指代大数据的一个生态圈,包括很多其他的软件
2.6 hadoop的架构模型1.x的版本架构模型介绍
文件系统核心模块:
NameNode:集群当中的主节点,管理元数据(文件的大小,文件的位置,文件的权限),主要用于管理集群当中的各种数据
secondaryNameNode:主要能用于hadoop当中元数据信息的辅助管理
DataNode:集群当中的从节点,主要用于存储集群当中的各种数据
数据计算核心模块:
JobTracker:接收用户的计算请求任务,并分配任务给从节点
TaskTracker:负责执行主节点JobTracker分配的任务
2.x的版本架构模型介绍
第一种:NameNode与ResourceManager单节点架构模型
文件系统核心模块:
NameNode:集群当中的主节点,主要用于管理集群当中的各种数据
secondaryNameNode:主要能用于hadoop当中元数据信息的辅助管理
DataNode:集群当中的从节点,主要用于存储集群当中的各种数据
数据计算核心模块:
ResourceManager:接收用户的计算请求任务,并负责集群的资源分配
NodeManager:负责执行主节点APPmaster分配的任务
第二种:NameNode单节点与ResourceManager高可用架构模型
文件系统核心模块:
NameNode:集群当中的主节点,主要用于管理集群当中的各种数据
secondaryNameNode:主要能用于hadoop当中元数据信息的辅助管理
DataNode:集群当中的从节点,主要用于存储集群当中的各种数据
数据计算核心模块:
ResourceManager:接收用户的计算请求任务,并负责集群的资源分配,以及计算任务的划分,通过zookeeper实现ResourceManager的高可用
NodeManager:负责执行主节点ResourceManager分配的任务
第三种:NameNode高可用与ResourceManager单节点架构模型
文件系统核心模块:
NameNode:集群当中的主节点,主要用于管理集群当中的各种数据,其中nameNode可以有两个,形成高可用状态
DataNode:集群当中的从节点,主要用于存储集群当中的各种数据
JournalNode:文件系统元数据信息管理
数据计算核心模块:
ResourceManager:接收用户的计算请求任务,并负责集群的资源分配,以及计算任务的划分
NodeManager:负责执行主节点ResourceManager分配的任务
第四种:NameNode与ResourceManager高可用架构模型
文件系统核心模块:
NameNode:集群当中的主节点,主要用于管理集群当中的各种数据,一般都是使用两个,实现HA高可用
JournalNode:元数据信息管理进程,一般都是奇数个
DataNode:从节点,用于数据的存储
数据计算核心模块:
ResourceManager:Yarn平台的主节点,主要用于接收各种任务,通过两个,构建成高可用
NodeManager:Yarn平台的从节点,主要用于处理ResourceManager分配的任务
03 Hadoop 核心介绍3.1 HDFSHDFS(Hadoop Distributed File System) 是一个 Apache Software Foundation 项目, 是 Apache Hadoop 项目的一个子项目. Hadoop 非常适于存储大型数据 (比如 TB 和 PB), 其就是使用 HDFS 作为存储系统. HDFS 使用多台计算机存储文件, 并且提供统一的访问接口, 像是访问一个普通文件系统一样使用分布式文件系统. HDFS 对数据文件的访问通过流的方式进行处理, 这意味着通过命令和 MapReduce 程序的方式可以直接使用 HDFS. HDFS 是容错的, 且提供对大数据集的高吞吐量访问.
HDFS 的一个非常重要的特点就是一次写入、多次读取, 该模型降低了对并发控制的要求, 简化了数据聚合性, 支持高吞吐量访问. 而吞吐量是大数据系统的一个非常重要的指标, 吞吐量高意味着能处理的数据量就大.
3.1.1 设计目标所有的文件都是以 block 块的方式存放在 HDFS 文件系统当中, 在 Hadoop1 当中, 文件的 block 块默认大小是64M, hadoop2 当中, 文件的 block 块大小默认是 128M, block 块的大小可以通过 hdfs-site.xml 当中的配置文件进行指定
<property>
<name>dfs.block.size</name>
<value>块大小 以字节为单位</value>
</property>
(1)引入块机制的好处
(2)块缓存
通常 DataNode 从磁盘中读取块, 但对于访问频繁的文件, 其对应的块可能被显式的缓存在 DataNode 的内存中, 以堆外块缓存的形式存在. 默认情况下,一个块仅缓存在一个 DataNode 的内存中,当然可以针对每个文件配置 DataNode 的数量. 作业调度器通过在缓存块的 DataNode 上运行任务, 可以利用块缓存的优势提高读操作的性能.
例如:
连接(join) 操作中使用的一个小的查询表就是块缓存的一个很好的候选
用户或应用通过在缓存池中增加一个 Cache Directive 来告诉 NameNode 需要缓存哪些文件及存多久. 缓存池(Cache Pool) 是一个拥有管理缓存权限和资源使用的管理性分组.
例如一个文件 130M, 会被切分成 2 个 block 块, 保存在两个 block 块里面, 实际占用磁盘 130M 空间, 而不是占用256M的磁盘空间
(3)HDFS 文件权限验证
HDFS 的文件权限机制与 Linux 系统的文件权限机制类似
r:read w:write x:execute
权限 x 对于文件表示忽略, 对于文件夹表示是否有权限访问其内容 如果 Linux 系统用户 zhangsan 使用 Hadoop 命令创建一个文件, 那么这个文件在 HDFS 当中的 Owner 就是 zhangsan HDFS 文件权限的目的, 防止好人做错事, 而不是阻止坏人做坏事. HDFS相信你告诉我你是谁, 你就是谁
3.1.5 HDFS 的元信息和 SecondaryNameNode当 Hadoop 的集群当中, 只有一个 NameNode 的时候, 所有的元数据信息都保存在了 FsImage 与 Eidts 文件当中, 这两个文件就记录了所有的数据的元数据信息, 元数据信息的保存目录配置在了 hdfs-site.xml 当中
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///export/servers/hadoop-3.1.1/datas/namenode/namenodedatas</value>
</property>
<property>
<name>dfs.namenode.edits.dir</name>
<value>file:///export/servers/hadoop-3.1.1/datas/dfs/nn/edits</value>
</property>
(1) FsImage 和 Edits 详解
(2)fsimage 中的文件信息查看
官方查看文档
使用命令 hdfs oiv
cd /export/servers/hadoop-3.1.1/datas/namenode/namenodedatas
hdfs oiv -i fsimage_0000000000000000864 -p XML -o hello.xml
(3) edits 中的文件信息查看
官方查看文档
使用命令 hdfs oev
cd /export/servers/hadoop-3.1.1/datas/dfs/nn/edits
hdfs oev -i edits_0000000000000000865-0000000000000000866 -o myedit.xml -p XML
(4) SecondaryNameNode 如何辅助管理 fsimage 与 edits 文件?
(5)特点
(1)导入 Maven 依赖
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
(2)概述
在 Java 中操作 HDFS, 主要涉及以下 Class:
(3)获取 FileSystem 的几种方式
@Test
public void getFileSystem() throws URISyntaxException, IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), configuration);
System.out.println(fileSystem.toString());
}
@Test
public void getFileSystem2() throws URISyntaxException, IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.52.250:8020");
FileSystem fileSystem = FileSystem.get(new URI("/"), configuration);
System.out.println(fileSystem.toString());
}
@Test
public void getFileSystem3() throws URISyntaxException, IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://192.168.52.250:8020"), configuration);
System.out.println(fileSystem.toString());
}
@Test
public void getFileSystem4() throws Exception{
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.52.250:8020");
FileSystem fileSystem = FileSystem.newInstance(configuration);
System.out.println(fileSystem.toString());
}
(4)遍历 HDFS 中所有文件
@Test
public void listFile() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.100:8020"), new Configuration());
FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/"));
for (FileStatus fileStatus : fileStatuses) {
if(fileStatus.isDirectory()){
Path path = fileStatus.getPath();
listAllFiles(fileSystem,path);
}else{
System.out.println("文件路径为" fileStatus.getPath().toString());
}
}
}
public void listAllFiles(FileSystem fileSystem,Path path) throws Exception{
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
if(fileStatus.isDirectory()){
listAllFiles(fileSystem,fileStatus.getPath());
}else{
Path path1 = fileStatus.getPath();
System.out.println("文件路径为" path1);
}
}
}
@Test
public void listMyFiles()throws Exception{
//获取fileSystem类
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
//获取RemoteIterator 得到所有的文件或者文件夹,第一个参数指定遍历的路径,第二个参数表示是否要递归遍历
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/"), true);
while (locatedFileStatusRemoteIterator.hasNext()){
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
System.out.println(next.getPath().toString());
}
fileSystem.close();
}
(5)下载文件到本地
@Test
public void getFileToLocal()throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
FSDataInputStream open = fileSystem.open(new Path("/test/input/install.log"));
FileOutputStream fileOutputStream = new FileOutputStream(new File("c:\\install.log"));
IOUtils.copy(open,fileOutputStream );
IOUtils.closeQuietly(open);
IOUtils.closeQuietly(fileOutputStream);
fileSystem.close();
}
(6)HDFS 上创建文件夹
@Test
public void mkdirs() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
boolean mkdirs = fileSystem.mkdirs(new Path("/hello/mydir/test"));
fileSystem.close();
}
(7)HDFS 文件上传
@Test
public void putData() throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration());
fileSystem.copyFromLocalFile(new Path("file:///c:\\install.log"),new Path("/hello/mydir/test"));
fileSystem.close();
}
(8)伪造用户
cd /export/servers/hadoop-3.1.1
sbin/stop-dfs.sh
cd /export/servers/hadoop-3.1.1/etc/hadoop
vim hdfs-site.xml
<property>
<name>dfs.permissions.enabled</name>
<value>true</value>
</property>
scp hdfs-site.xml node02:$PWD
scp hdfs-site.xml node03:$PWD
cd /export/servers/hadoop-3.1.1
sbin/start-dfs.sh
cd /export/servers/hadoop-3.1.1/etc/hadoop
hdfs dfs -mkdir /config
hdfs dfs -put *.xml /config
hdfs dfs -chmod 600 /config/core-site.xml
@Test
public void getConfig()throws Exception{
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration(),"hadoop");
fileSystem.copyToLocalFile(new Path("/config/core-site.xml"),new Path("file:///c:/core-site.xml"));
fileSystem.close();
}
(9)小文件合并
由于 Hadoop 擅长存储大文件,因为大文件的元数据信息比较少,如果 Hadoop 集群当中有大量的小文件,那么每个小文件都需要维护一份元数据信息,会大大的增加集群管理元数据的内存压力,所以在实际工作当中,如果有必要一定要将小文件合并成大文件进行一起处理
在我们的 HDFS 的 Shell 命令模式下,可以通过命令行将很多的 hdfs 文件合并成一个大文件下载到本地
cd /export/servers
hdfs dfs -getmerge /config/*.xml ./hello.xml
既然可以在下载的时候将这些小文件合并成一个大文件一起下载,那么肯定就可以在上传的时候将小文件合并到一个大文件里面去
@Test
public void mergeFile() throws Exception{
//获取分布式文件系统
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration(),"hadoop");
FSDataOutputStream outputStream = fileSystem.create(new Path("/bigfile.xml"));
//获取本地文件系统
LocalFileSystem local = FileSystem.getLocal(new Configuration());
//通过本地文件系统获取文件列表,为一个集合
FileStatus[] fileStatuses = local.listStatus(new Path("file:///F:\\传智播客大数据离线阶段课程资料\\3、大数据离线第三天\\上传小文件合并"));
for (FileStatus fileStatus : fileStatuses) {
FSDataInputStream inputStream = local.open(fileStatus.getPath());
IOUtils.copy(inputStream,outputStream);
IOUtils.closeQuietly(inputStream);
}
IOUtils.closeQuietly(outputStream);
local.close();
fileSystem.close();
}
04 MapReduce介绍
MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。
Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce负责“合”,即对map阶段的结果进行全局汇总。
MapReduce运行在yarn集群
这两个阶段合起来正是MapReduce思想的体现。
4.1 MapReduce设计思想和架构MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
Hadoop MapReduce构思:
分而治之
对相互间不具有计算依赖关系的大数据,实现并行最自然的办法就是采取分而治之的策略。并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。不可分拆的计算任务或相互间有依赖关系的数据无法进行并行计算!
统一构架,隐藏系统层细节
如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员则需要考虑诸如数据存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统
层面的处理细节。
MapReduce最大的亮点在于通过抽象模型和计算框架把需要做什么(what need to do)与具体怎么做(how to do)分开了,为程序员提供一个抽象和高层的编程接口和框架。程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:从分布代码的执行,到大到数千小到单个节点集群的自动调度使用。
构建抽象模型:Map和Reduce
MapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型
Map: 对一组数据元素进行某种重复式的处理;
Reduce: 对Map的中间结果进行某种进一步的结果整理。
Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce
处理的数据类型是键值对。
MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:
Map: (k1; v1) → [(k2; v2)]
Reduce: (k2; [v2]) → [(k3; v3)]
MapReduce 框架结构
一个完整的mapreduce程序在分布式运行时有三类实例进程:
MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为 2 个步骤,Shuffle 阶段 4个步骤,Reduce 阶段分为 2 个步骤
Map 阶段 2 个步骤
Shuffle 阶段 4 个步骤
Reduce 阶段 2 个步骤
转换为代码,例子如下
Map阶段
public class WordCountMapper extends Mapper<Text,Text,Text, LongWritable> {
/**
* K1-----V1
* A -----A
* B -----B
* C -----C
*
* K2-----V2
* A -----1
* B -----1
* C -----1
*
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key,new LongWritable(1));
}
}
Reduce阶段
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0L;
for (LongWritable value : values) {
count = value.get();
}
context.write(key, new LongWritable(count));
}
}
shuffle阶段,举一个分区的例子:
public class WordCountPartitioner extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text text, LongWritable longWritable, int i) {
if (text.toString().length() > 5) {
return 1;
}
return 0;
}
}
主方法
public class JobMain extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(),new JobMain(),args);
}
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(super.getConf(), "wordcout");
job.setJarByClass(JobMain.class);
//输入
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("/"));
//map
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//shuffle阶段
job.setPartitionerClass(WordCountPartitioner.class);
job.setNumReduceTasks(2);
//reduce阶段
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//输出
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("/"));
return 0;
}
}
4.4 MapTask运行机制
具体步骤:
Reduce 大致分为 copy、sort、reduce 三个阶段,重点在前两个阶段。copy 阶段包含一个 eventFetcher 来获取已完成的 map 列表,由 Fetcher 线程去 copy 数据,在此过程中会启动两个 merge 线程,分别为 inMemoryMerger 和 onDiskMerger,分别将内存中的数据 merge 到磁盘和将磁盘中的数据进行 merge。待数据 copy 完成之后,copy 阶段就完成了,开始进行 sort 阶段,sort 阶段主要是执行 finalMerge 操作,纯粹的 sort 阶段,完成之后就是 reduce 阶段,调用用户定义的 reduce 函数进行处理
详细步骤:
map 阶段处理的数据如何传递给 reduce 阶段,是 MapReduce 框架中最关键的一个流程,这个流程就叫 shuffle
shuffle: 洗牌、发牌 ——(核心机制:数据分区,排序,分组,规约,合并等过程)
缓冲区的大小可以通过参数调整, 参数:mapreduce.task.io.sort.mb 默认100M
原文链接:https://www.cnblogs.com/whgk/p/14542608.html
Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved