暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

MapReduce从mysql中读取配置文件

小邱说 2017-02-08
131
/**
* Created by bob on 2017/2/6.
*/
public class ModelToStore {
private static final Logger LOGGER = LoggerFactory.getLogger(ModelToStore.class);



   public static class ModelToStoreMap extends Mapper<Object, Text, Text, Text> {

private int articleNum=0;
       @Override
       protected void setup(Context context) throws IOException, InterruptedException {
Connection connection = DataSourceManager.getConnection();
           PreparedStatement preparedStatement = null;
           ResultSet resultSet = null;
           try {
preparedStatement = connection.prepareStatement("SELECT num FROM fd_article_max_num");
                resultSet = preparedStatement.executeQuery();
               while (resultSet.next()) {
articleNum = resultSet.getInt("num");
               }

} catch (SQLException e) {
e.printStackTrace();
           }finally {
if (resultSet != null) {
try {
resultSet.close();
                   } catch (SQLException e) {
e.printStackTrace();
                   }
}
if (preparedStatement != null) {
try {
preparedStatement.close();
                   } catch (SQLException e) {
e.printStackTrace();
                   }
}
try {
if (connection != null) {
connection.close();
                   }

} catch (SQLException e) {
e.printStackTrace();
               }
}

super.setup(context);
       }

@Override
       protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
articleNum++;
       }


@Override
       protected void cleanup(Context context) throws IOException, InterruptedException {
Connection connection = DataSourceManager.getConnection();
           PreparedStatement preparedStatement = null;
           try {
preparedStatement = connection.prepareStatement("UPDATE fd_article_max_num SET num=?");
               preparedStatement.setInt(1,articleNum);
               preparedStatement.executeUpdate();
           } catch (SQLException e) {
e.printStackTrace();
           }finally {
if (preparedStatement != null) {
try {
preparedStatement.close();
                   } catch (SQLException e) {
e.printStackTrace();
                   }
}
try {
if (connection != null) {
connection.close();
                   }
} catch (SQLException e) {
e.printStackTrace();
               }
}

super.cleanup(context);
       }
}


public static class ModelToStoreReducer extends Reducer<Text,Text,Text,Text> {
@Override
       protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
super.reduce(key, values, context);
       }
}

public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
JobConf jobConf = new JobConf();
       String[] otherArgs = new GenericOptionsParser(jobConf, args).getRemainingArgs();
       System.out.println("参数:"+ Arrays.toString(otherArgs));
       Job job = new Job(jobConf, "ModelToStore");
       job.setJarByClass(ModelToStore.class);
       //设置Map、Combine和Reduce处理类
       job.setMapperClass(ModelToStoreMap.class);
       job.setReducerClass(ModelToStoreReducer.class);

       job.setInputFormatClass(TextInputFormat.class);
       job.setOutputFormatClass(TextOutputFormat.class);
       //设置输出类型
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(Text.class);
       job.setMapOutputKeyClass(Text.class);
       job.setNumReduceTasks(Integer.parseInt(otherArgs[0]));
       //设置输入和输出目录
       TextInputFormat.addInputPath(job, new Path(otherArgs[1]));

       String OUTPATH = otherArgs[2];

       FileSystem fs = FileSystem.get(new URI(OUTPATH), jobConf);
       if (fs.exists(new Path(OUTPATH))) {
fs.delete(new Path(OUTPATH), true);
       }

TextOutputFormat.setOutputPath(job, new Path(OUTPATH));
       System.exit(job.waitForCompletion(true) ? 0 : 1);

   }
}


文章转载自小邱说,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论