利用JavaAPI访问HDFS的文件 http://blog.csdn.net/zhangzhaokun/article/details/5597433
详细参考: http://blog.csdn.net/lastsweetop/article/details/9001467
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
package com.pandy.hadoop.maprender;/** * Created by pandy on 16-12-9. */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Progressable; import java.io.*; import java.net.URI; /** * 项目名称: workspace * 功能说明: * 创建者: Pandy, * 邮箱: panyongzheng@163.com, 1453261799@qq.com * 版权: * 官网: * 创建日期: 16-12-9. * 创建时间: 下午3:56. * 修改历史: * ----------------------------------------------- */ public class HdfsAccessTest { private static String HDFS_URL = "hdfs://192.168.0.31:9000"; public static void main(String[] args) throws Exception { try { deleteFromHdfs("/tmp/input/001/001.txt"); uploadToHdfs("/home/pandy/tmp/words_01.txt", "/tmp/input/001/001.txt"); downloadFromHdfs("/tmp/input/words_01.txt","/home/pandy/tmp/words_01.txt"); listDirFromHdfs("/tmp"); //appendToHdfs(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { System.out.println("SUCCESS"); } } /** * 上传文件到HDFS上去 */ private static void uploadToHdfs(String localSrc, String hdfsSrc) throws FileNotFoundException, IOException { String dst = HDFS_URL + hdfsSrc; InputStream in = new BufferedInputStream(new FileInputStream(localSrc)); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); OutputStream out = fs.create(new Path(dst), new Progressable() { public void progress() { System.out.print("."); } }); IOUtils.copyBytes(in, out, 4096, true); } /** * 从HDFS上读取文件 */ private static void downloadFromHdfs(String hdfsSrc, String localSrc) throws FileNotFoundException, IOException { String dst = HDFS_URL + hdfsSrc; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); FSDataInputStream hdfsInStream = fs.open(new Path(dst)); OutputStream out = new FileOutputStream(localSrc); byte[] ioBuffer = new byte[1024]; int readLen = hdfsInStream.read(ioBuffer); while (-1 != readLen) { out.write(ioBuffer, 0, readLen); readLen = hdfsInStream.read(ioBuffer); } out.close(); hdfsInStream.close(); fs.close(); } /** * 以append方式将内容添加到HDFS上文件的末尾;注意:文件更新,需要在hdfs-site.xml中添<property><name>dfs.append.support</name><value>true</value></property> */ private static void appendToHdfs() throws FileNotFoundException, IOException { String dst = HDFS_URL + "/user/zhangzk/qq.txt"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); FSDataOutputStream out = fs.append(new Path(dst)); int readLen = "zhangzk add by hdfs Java api".getBytes().length; while (-1 != readLen) { out.write("zhangzk add by hdfs java api".getBytes(), 0, readLen); } out.close(); fs.close(); } /** * 从HDFS上删除文件 */ private static void deleteFromHdfs(String hdfsSrc) throws FileNotFoundException, IOException { String dst = HDFS_URL + hdfsSrc; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); fs.deleteOnExit(new Path(dst)); fs.close(); } /** * 遍历HDFS上的文件和目录 */ private static void listDirFromHdfs(String hdfsSrc) throws FileNotFoundException, IOException { String dst = HDFS_URL + hdfsSrc; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); FileStatus fileList[] = fs.listStatus(new Path(dst)); int size = fileList.length; for (int i = 0; i < size; i++) { System.out.println("名称:" + fileList[i].getPath().getName() + " 长度:" + fileList[i].getLen()); } fs.close(); } } |