/**
* Created by bob on 2017/2/6.
*/
public class ModelToStore {
private static final Logger LOGGER = LoggerFactory.getLogger(ModelToStore.class);
public static class ModelToStoreMap extends Mapper<Object, Text, Text, Text> {
private int articleNum=0;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Connection connection = DataSourceManager.getConnection();
PreparedStatement preparedStatement = null;
ResultSet resultSet = null;
try {
preparedStatement = connection.prepareStatement("SELECT num FROM fd_article_max_num");
resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
articleNum = resultSet.getInt("num");
}
} catch (SQLException e) {
e.printStackTrace();
}finally {
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
super.setup(context);
}
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
articleNum++;
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
Connection connection = DataSourceManager.getConnection();
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement("UPDATE fd_article_max_num SET num=?");
preparedStatement.setInt(1,articleNum);
preparedStatement.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
}finally {
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
super.cleanup(context);
}
}
public static class ModelToStoreReducer extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
super.reduce(key, values, context);
}
}
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
JobConf jobConf = new JobConf();
String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
System.out.println("参数:"+ Arrays.toString(otherArgs));
Job job = new Job(jobConf, "ModelToStore");
job.setJarByClass(ModelToStore.class);
//设置Map、Combine和Reduce处理类
job.setMapperClass(ModelToStoreMap.class);
job.setReducerClass(ModelToStoreReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setNumReduceTasks(Integer.parseInt(otherArgs[0]));
//设置输入和输出目录
TextInputFormat.addInputPath(job, new Path(otherArgs[1]));
String OUTPATH = otherArgs[2];
FileSystem fs = FileSystem.get(new URI(OUTPATH), jobConf);
if (fs.exists(new Path(OUTPATH))) {
fs.delete(new Path(OUTPATH), true);
}
TextOutputFormat.setOutputPath(job, new Path(OUTPATH));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
文章转载自小邱说,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




