Mapreduce任务实现邮件监控
http://my.oschina.net/mkh/blog/493885
这里主要使用Java自带邮件类实现Mapreduce任务的监控,如果Mapreduce任务报错则发送报错邮件。Mapreduce的报错信息通过hdfs中的日志获取,里面的报错日志是json格式,这里讲json转换成xml格式发送到邮件。具体代码如下
这里主要使用Java自带邮件类实现Mapreduce任务的监控,如果Mapreduce任务报错则发送报错邮件。Mapreduce的报错信息通过hdfs中的日志获取,里面的报错日志是json格式,这里讲json转换成xml格式发送到邮件。具体代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
Java代码 import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.URI; import java.util.Properties; import java.util.StringTokenizer; import javax.mail.Authenticator; import javax.mail.Message; import javax.mail.MessagingException; import javax.mail.PasswordAuthentication; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; import net.sf.json.JSONArray; import net.sf.json.JSONObject; import net.sf.json.xml.XMLSerializer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; public class Email { private static final String USERNAME = "123456@qq.com";//发送邮件的用户名 private static final String PASSWORD = "123456789";//发送邮件的用户名对应的密码 private static final String EMAIL_HOST = "smtp.qq.com";//邮件服务器host public static void main(String args[]) { try { sendEmail("测试邮件", "测试邮件内容!", "test@qq.com"); System.out.println("email ok !"); } catch (MessagingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @category 发送邮件方法,该方法实现发送Mapreduce任务报错信息,具体的报错信息通过hdfs的报错日志获取 * @param to 目标邮箱(可以多个邮箱,用,号隔开) * @param job 通过mapreduce的job获取jobID * @param time 通过时间戳访问错误日志路径 * @throws Exception */ public static void sendErrMail(String to, Job job, String time) throws Exception { String subject = job.getJobName(); String message = getErr(job, time); LoginMail lm = new LoginMail(USERNAME, PASSWORD); // 创建session Properties props = new Properties(); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.host", EMAIL_HOST); Session session = Session.getDefaultInstance(props, lm); // 创建 message Message msg = new MimeMessage(session); // 设置发送源地址 msg.setFrom(new InternetAddress(USERNAME)); // 多用户分解 StringTokenizer st = new StringTokenizer(to, ","); String[] recipients = new String[st.countTokens()]; int rc = 0; while (st.hasMoreTokens()) recipients[rc++] = st.nextToken(); InternetAddress[] addressTo = new InternetAddress[recipients.length]; for (int i = 0; i < recipients.length; i++) { addressTo[i] = new InternetAddress(recipients[i]); } msg.setRecipients(Message.RecipientType.TO, addressTo); // 设置邮件主题并发送邮件 msg.setSubject(subject); msg.setContent(message, "text/html;charset=utf-8"); Transport.send(msg); } /** * @category 自定义主题内容发送,这里的邮件内容不一定是Mapreduce的,可以任意填写 * @param subject 主题 * @param body 内容 * @param to 目标邮箱 * @throws MessagingException */ public static void sendEmail(String subject, String body, String to) throws MessagingException { LoginMail lm = new LoginMail(USERNAME, PASSWORD); // 创建session Properties props = new Properties(); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.host", EMAIL_HOST); Session session = Session.getDefaultInstance(props, lm); // 创建 message Message msg = new MimeMessage(session); // 设置发送源地址 msg.setFrom(new InternetAddress(USERNAME)); // 多用户分解 StringTokenizer st = new StringTokenizer(to, ","); String[] recipients = new String[st.countTokens()]; int rc = 0; while (st.hasMoreTokens()) recipients[rc++] = st.nextToken(); InternetAddress[] addressTo = new InternetAddress[recipients.length]; for (int i = 0; i < recipients.length; i++) { addressTo[i] = new InternetAddress(recipients[i]); } msg.setRecipients(Message.RecipientType.TO, addressTo); // 设置邮件主题并发送邮件 msg.setSubject(subject); msg.setContent(body, "text/html;charset=utf-8"); Transport.send(msg); } /** * @category 获取日志文件 * @param job * @param time * @return FSDataInputStream * @throws IOException */ public static FSDataInputStream getFile(Job job, String time) throws IOException { String year = time.substring(0, 4); String month = time.substring(4, 6); String day = time.substring(6, 8); String dst = "hdfs://192.168.1.100:9000/tmp/hadoop-yarn/staging/history/done/" + year + "/" + month + "/" + day + "/000000"; FileSystem fs = FileSystem.get(URI.create(dst), new Configuration()); FileStatus[] status = fs.listStatus(new Path(dst)); FSDataInputStream in = null; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName() .contains(job.getJobID().toString()) && status[i].getPath().getName().endsWith("jhist")) { in = new FSDataInputStream(fs.open(status[i].getPath())); } } return in; } /** * @category 解析文件类容为xml * @param job * @param time * @return xml * @throws IOException * @throws InterruptedException */ public static String getErr(Job job, String time) throws IOException, InterruptedException { FSDataInputStream in = getFile(job, time); Thread t1 = new Thread(); while (in == null) { t1.sleep(20000);//由于hdfs每个job的日志不是实时生成,所以需要每隔20秒检查一次hdfs该job日志是否已生成 t1.join(); in = getFile(job, time); } BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = ""; JSONObject jo; JSONArray jsa = new JSONArray(); String xml = ""; XMLSerializer xmlSerializer = new XMLSerializer(); while ((line = br.readLine()) != null) { if (line.toUpperCase().indexOf("error".toUpperCase()) > -1) { jo = JSONObject.fromObject(line); jsa.add(jo); } } xml = xmlSerializer.write(jsa); in.close(); br.close(); return xml; } /** * @category 获取try-catch中的异常内容 * @param e Exception * @return 异常内容 */ public static String getException(Exception e) { ByteArrayOutputStream out = new ByteArrayOutputStream(); PrintStream pout = new PrintStream(out); e.printStackTrace(pout); String ret = new String(out.toByteArray()); pout.close(); try { out.close(); } catch (Exception ex) { } return ret; } } class LoginMail extends Authenticator { private String username; private String password; public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override protected PasswordAuthentication getPasswordAuthentication() { return new PasswordAuthentication(username, password); } public LoginMail(String username, String password) { this.username = username; this.password = password; } } |