hadoop环境下的Sequence File的读写与合并

hadoop环境下的Sequence File的读写与合并

首页冒险解谜Merge Raid io更新时间:2024-05-05
一、Sequence File的读写

Sequence File文件介绍参考本专栏的hdfs的文件存储格式及压缩算法中的介绍,如果需要更多的信息则需要自行搜索其他的资源。

1、Sequence File的格式

根据压缩类型,有3种不同的Sequence File格式:未压缩格式、record压缩格式、block压缩格式。

Sequence File由一个header和一个或多个record组成。

以上三种格式均使用相同的header结构,如下所示:

前3个字节为SEQ,表示该文件是序列文件,后跟一个字节表示实际版本号(例如SEQ4或SEQ6)。

Header中其他也包括key、Value class名字、 压缩细节、metadata、Sync marker。
Sync Marker同步标记,用于可以读取任意位置的数据。

1)、未压缩格式

未压缩的Sequence File文件由header、record、sync三个部分组成。

其中record包含了4个部分:record length(记录长度)、key length(键长)、key、value。

每隔几个record(100字节左右)就有一个同步标记

2)、基于record压缩格式

基于record压缩的Sequence File文件由header、record、sync三个部分组成。

其中record包含了4个部分:record length(记录长度)、key length(键长)、key、compressed value(被压缩的值)。

每隔几个record(100字节左右)就有一个同步标记。

3)、基于block压缩格式

基于block压缩的Sequence File文件由header、block、sync三个部分组成。

block指的是record block,可以理解为多个record记录组成的块。这个block和HDFS中分块存储的block(128M)是不同的概念。

Block中包括:record条数、压缩的key长度、压缩的keys、压缩的value长度、压缩的values。每隔一个block就有一个同步标记。

block压缩比record压缩提供更好的压缩率。使用Sequence File时,通常首选块压缩。

2、Sequence File文件读写1)、pom.xml

<dependencies>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>3.1.4</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-hdfs</artifactId>

<version>3.1.4</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>3.1.4</version>

</dependency>

</dependencies>

2)、实现类

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.SequenceFile;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.io.compress.CompressionCodec;

import org.apache.hadoop.io.compress.GzipCodec;

import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileRW {

private static final String[] DATA = {

"One, two, buckle my shoe",

"Three, four, shut the door",

"Five, six, pick up sticks",

"Seven, eight, lay them straight",

"Nine, ten, a big fat hen"

};

public static void main(String[] args) throws Exception {

// 设置客户端运行身份 以root去操作访问HDFS

System.setProperty("HADOOP_USER_NAME", "alanchan");

// Configuration 用于指定相关参数属性

Configuration conf = new Configuration();

conf.set("fs.defaultFS", "hdfs://HadoopHAcluster");

conf.set("dfs.nameservices", "HadoopHAcluster");

conf.set("dfs.ha.namenodes.HadoopHAcluster", "nn1,nn2");

conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn1", "server1:8020");

conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn2", "server2:8020");

conf.set("dfs.client.failover.proxy.provider.HadoopHAcluster","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

Path path = new Path("/testseq/test.seq");

write(conf, path);

read(conf, path);

}

public static void write(Configuration conf, Path path) throws Exception {

// sequence file key、value

IntWritable key = new IntWritable();

Text value = new Text();

// 构造Writer参数属性

SequenceFile.Writer writer = null;

CompressionCodec Codec = new GzipCodec();

SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(path);

SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(key.getClass());

SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(value.getClass());

SequenceFile.Writer.Option optCom = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, Codec);

try {

writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom);

for (int i = 0; i < 100; i ) {

key.set(100 - i);

value.set(DATA[i % DATA.length]);

System.out.printf("[%s]\t%s\t%s", writer.getLength(), key, value);

writer.append(key, value);

}

} finally {

IOUtils.closeStream(writer);

}

System.out.println("写完了");

}

public static void read(Configuration conf, Path path) throws Exception {

SequenceFile.Reader.Option option1 = SequenceFile.Reader.file(path);

SequenceFile.Reader.Option option2 = SequenceFile.Reader.length(374);// 这个374参数表示读取的长度

SequenceFile.Reader reader = null;

try {

reader = new SequenceFile.Reader(conf, option1, option2);

Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);

Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);

long position = reader.getPosition();

while (reader.next(key, value)) {

String syncSeen = reader.syncSeen() ? "*" : "";// 是否返回了Sync Mark同步标记

System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);

position = reader.getPosition(); // beginning of next record

}

} finally {

IOUtils.closeStream(reader);

}

}

}

二、小文件合并

将所有的小文件写入到一个Sequence File中,即将文件名作为key,文件内容作为value序列化到Sequence File大文件中

import java.io.File;

import java.io.FileInputStream;

import java.nio.charset.Charset;

import java.util.ArrayList;

import java.util.List;

import org.apache.commons.codec.digest.DigestUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileUtil;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.SequenceFile;

import org.apache.hadoop.io.SequenceFile.Reader;

import org.apache.hadoop.io.SequenceFile.Writer;

import org.apache.hadoop.io.Text;

public class SmallFilesMergeBySequenceFile {

private List<String> smallFilePaths = new ArrayList<String>();

public void addInputPath(String path) throws Exception {

File file = new File(path);

if (file.isDirectory()) {

File[] files = FileUtil.listFiles(file);

for (File sFile : files) {

smallFilePaths.add(sFile.getPath());

System.out.println("添加小文件路径:" sFile.getPath());

}

} else {

smallFilePaths.add(file.getPath());

System.out.println("添加小文件路径:" file.getPath());

}

}

public void mergeFile(Configuration configuration, Path path) throws Exception {

Writer.Option bigFile = Writer.file(path);

Writer.Option keyClass = Writer.keyClass(Text.class);

Writer.Option valueClass = Writer.valueClass(BytesWritable.class);

Writer writer = SequenceFile.createWriter(configuration, bigFile, keyClass, valueClass);

Text key = new Text();

for (String sfps : smallFilePaths) {

File file = new File(sfps);

long fileSize = file.length();

byte[] fileContent = new byte[(int) fileSize];

FileInputStream inputStream = new FileInputStream(file);

inputStream.read(fileContent, 0, (int) fileSize);

String md5Str = DigestUtils.md5Hex(fileContent);

System.out.println("merge小文件:" sfps ",md5:" md5Str);

key.set(sfps);

// 把文件路径作为key,文件内容做为value,放入到sequencefile中

writer.append(key, new BytesWritable(fileContent));

}

writer.hflush();

writer.close();

}

public void readMergedFile(Configuration configuration, Path path) throws Exception {

Reader.Option file = Reader.file(path);

Reader reader = new Reader(configuration, file);

Text key = new Text();

BytesWritable value = new BytesWritable();

while (reader.next(key, value)) {

byte[] bytes = value.copyBytes();

String md5 = DigestUtils.md5Hex(bytes);

String content = new String(bytes, Charset.forName("GBK"));

System.out.println("读取到文件:" key ",md5:" md5 ",content:" content);

}

}

public static void main(String[] args) throws Exception {

Configuration configuration = new Configuration();

SmallFilesMergeBySequenceFile msf = new SmallFilesMergeBySequenceFile();

Path path = new Path("");

msf.addInputPath("");//

msf.mergeFile(configuration, path);

msf.readMergedFile(configuration, path);

}

}

,
大家还看了
也许喜欢
更多游戏

Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved