参考: http://www.voidcn.com/blog/linux_ja/article/p-4507997.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
<!-- hbase --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.3</version> <!-- Sqoop-1.4.6需要使用Hbase-1.0(包括1.0)以前的版本 --> <!--<version>1.0.0</version>--> </dependency> <dependency> <groupId>org.apache.sqoop</groupId> <artifactId>sqoop-hadoop200</artifactId> <version>1.4.4</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.40</version> </dependency> |
hbase-site.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- 这个要去Ambari的hbase config查看 --> <configuration> <property> <name>hbase.master</name> <value>192.168.0.61:60000</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>h1</value> </property> <property> <name>hbase.zookeeper.property.clientPort</name> <value>2181</value> </property> <property> <name>zookeeper.znode.parent</name> <value>/hbase</value> </property> </configuration> |
有必要的话使用转换类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
package com.pandy.hadoop.sqoop;/** * Created by pandy on 16-12-3. */ import com.cloudera.sqoop.hbase.PutTransformer; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.*; /** * 项目名称: workspace * 功能说明: * 创建者: Pandy, * 邮箱: panyongzheng@163.com, 1453261799@qq.com * 版权: * 官网: * 创建日期: 16-12-3. * 创建时间: 下午3:30. * 修改历史: * ----------------------------------------------- */ public class DdtMapInfoTransFormat extends PutTransformer { private Map<String, byte[]> serializedFieldNames; public DdtMapInfoTransFormat() { serializedFieldNames = new TreeMap<String, byte[]>(); } public String strLPad(String str, int size, String padStr) { return StringUtils.leftPad(str, size, padStr); } public String intLPadZore(int num, int len) { return String.format("%0" + len + "d", num); } /** * Return the serialized bytes for a field name, using the cache if it's * already in there. */ private byte[] getFieldNameBytes(String fieldName) { byte[] cachedName = serializedFieldNames.get(fieldName); if (null != cachedName) { return cachedName; } byte[] nameBytes = Bytes.toBytes(fieldName); serializedFieldNames.put(fieldName, nameBytes); return nameBytes; } @Override /** {@inheritDoc} */ public List<Put> getPutCommand(Map<String, Object> fields) throws IOException { // 获得key的列 String rowKeyCol = getRowKeyColumn(); //获得列族 String colFamily = getColumnFamily(); String[] rowKeyColArray = rowKeyCol.split(","); StringBuffer rowKey = new StringBuffer(); for (int i = 0; i < rowKeyColArray.length; i++) { //增加分隔符 if (i > 0) { rowKey.append(","); } //尽量采用定长字符串来格式化rowkey if ("MP4_FILE_ID".equalsIgnoreCase(rowKeyColArray[i])) { int num = Integer.parseInt(rowKeyColArray[i] == null ? "0" : fields.get(rowKeyColArray[i]).toString()); rowKey.append(intLPadZore(num, 10)); } else if ("CODE".equalsIgnoreCase(rowKeyColArray[i])) { String str = fields.get(rowKeyColArray[i]).toString(); rowKey.append(strLPad(str, 15, "-")); } else if ("WIDGET_ID".equalsIgnoreCase(rowKeyColArray[i])) { int num = Integer.parseInt(rowKeyColArray[i] == null ? "0" : fields.get(rowKeyColArray[i]).toString()); rowKey.append(intLPadZore(num, 10)); } else { rowKey.append(fields.get(rowKeyColArray[i]).toString()); } } if (rowKey.toString().length() == 0) { //rowkey为空的时候, 不需要插入 return null; } byte[] colFamilyBytes = Bytes.toBytes(colFamily); Put put = new Put(Bytes.toBytes(rowKey.toString())); for (Map.Entry<String, Object> fieldEntry : fields.entrySet()) { String colName = fieldEntry.getKey(); Object val = fieldEntry.getValue(); if (null != val) { put.add(colFamilyBytes, getFieldNameBytes(colName), Bytes.toBytes(val.toString())); } } return Collections.singletonList(put); } } |
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
package com.pandy.hadoop.sqoop; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.tool.ImportTool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; import java.io.File; /** * 默认使用classes下面的hbase-site.xml配置 * 无法创建表, 所以要手动创建表,否则导入失败的 */ public class SqoopTest { public static void main(String[] args) throws Exception { File file = new File("/home/pandy/workspace/ZH5_WIDGET"); if (file.isDirectory() && file.exists()) { file.delete(); } file = new File("/home/pandy/workspace/ZH5_WIDGET.java"); if (file.exists()) { file.delete(); } HTableDescriptor vv; SqoopOptions options = new SqoopOptions(); options.setConnectString("jdbc:mysql://192.168.0.222:3306/pandy_psi"); options.setDriverClassName("com.mysql.jdbc.Driver"); options.setTableName("ZH5_WIDGET"); //options.setWhereClause("id > 0"); // this where clause works when importing whole table, ie when setTableName() is used options.setUsername("pandy"); options.setPassword("pandy"); options.setDirectMode(true); // Make sure the direct mode is off when importing data to HBase options.setNumMappers(1); // Default value is 4 options.setSqlQuery("SELECT * FROM ZH5_WIDGET WHERE 1=1 AND \\$CONDITIONS"); options.setSplitByCol("WIDGET_ID"); //要查询那些列导入 //String[] strs = {"MP4_FILE_ID", "CODE", "WIDGET_ID"}; //options.setColumns(strs); // HBase options options.setHBaseTable("ZH5_WIDGET"); options.setHBaseColFamily("def"); options.setCreateHBaseTable(true); // Create HBase table, if it does not exist //使用这些列来组合hbase的rowkey, 如果使用自定义rowkey规则类, 那么那个类最后可以决定rowkey的规则 options.setHBaseRowKeyColumn("MP4_FILE_ID,CODE,WIDGET_ID"); Configuration config = new Configuration(); //如果该语句不起作用,就在sqoop的conf.xml文件配置下并重启sqoop,这行是自定义rowkey生成规则 config.set("sqoop.hbase.insert.put.transformer.class", "com.pandy.hadoop.sqoop.DdtMapInfoTransFormat"); config.set("sqoop.hbase.add.row.key", "true"); config.setBoolean("sqoop.hbase.add.row.key", true); String dd = config.get("sqoop.hbase.add.row.key"); options.setConf(config); options.setFetchSize(2000); int ret = new ImportTool().run(options); } } |
问题:
A: Exception in thread “main” java.lang.NoSuchMethodError: org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;), 出现这个问题是因为表还没有被创建, 所以要先创建表. 这些代码是无法创建表的.