环境信息:
Hadoop 集群名称 data-test
Hadoop 节点 主机名 :
- hadoop01
- hadoop02
- hadoop03
写入 Parquet 文件到 Hdfs
Path file = new Path(
"hdfs:/test3.parquet");
String schemaStr = "message schema {optional int64 id;optional binary idc_id;}";
MessageType schema = MessageTypeParser.parseMessageType(schemaStr);
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://data-test");
configuration.set("dfs.nameservices","data-test");
configuration.set("dfs.ha.namenodes.data-test","nn1,nn2,nn3");
configuration.set("dfs.namenode.rpc-address.data-test.nn1","hadoop01:8020");
configuration.set("dfs.namenode.rpc-address.data-test.nn2","hadoop02:8020");
configuration.set("dfs.namenode.rpc-address.data-test.nn3","hadoop03:8020");
// 这个重点, 如果不配置这个 将会把 data-test 当成域名解析出现找不到 域名解析的错误
configuration.set("dfs.client.failover.proxy.provider.data-test","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
ExampleParquetWriter.Builder builder = ExampleParquetWriter
.builder(file).withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withConf(configuration)
.withType(schema);
ParquetWriter<Group> writer = builder.build();
SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
long start = System.currentTimeMillis();
for (long i = 0; i < 10000000L; i++) {
writer.write(groupFactory.newGroup().append("id", 1000L).append("idc_id", "xxx"+ getSring()));
if(i% 100000 == 0){
LOGGER.error("写入10w行");
}
}
writer.close();
LOGGER.info("耗时: {} ms" ,(System.currentTimeMillis() - start) );
System.err.println("写入完成..");
重 Hdfs 中读取 Parquet格式的文件
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://data-test");
configuration.set("dfs.nameservices","data-test");
configuration.set("dfs.ha.namenodes.data-test","nn1,nn2,nn3");
configuration.set("dfs.namenode.rpc-address.data-test.nn1","hadoop01:8020");
configuration.set("dfs.namenode.rpc-address.data-test.nn2","hadoop02:8020");
configuration.set("dfs.namenode.rpc-address.data-test.nn3","hadoop03:8020");
configuration.set("dfs.client.failover.proxy.provider.data-test","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
Path file = new Path(
"hdfs:/test3.parquet");
ParquetReader.Builder<Group> builder = ParquetReader.builder(new GroupReadSupport(), file).withConf(configuration);
ParquetReader<Group> reader = builder.build();
SimpleGroup group =(SimpleGroup) reader.read();
while (group != null){
System.out.println("schema:"+group.getType().toString());
System.out.println("idc_id:"+group.getString(1, 0));
group = (SimpleGroup) reader.read();
}
依赖的Jar包
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.4</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.4</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
这样就可以从配置了 Ha的hadoop 集群 读取和写入 Parquet文件了
导入的java包
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.UUID;