在sql中使用的连接包括innerjoin,leftjoin,rightjoin,allouter几种连接方式。在大数据中实现这几种连接方式如下:
protected void reduce(Text key, Iterable<UserLog> values, Context context) throws IOException, InterruptedException {
users.clear();
logs.clear();
for (UserLog userLog : values) {
System.out.println("UserReduce reduce :" + key + "==" + userLog.toString());
if (userLog.getType() .equals("I") ) {
users.add(new Text(userLog.getData()));
} else {
logs.add(new Text(userLog.getData()));
}
}
System.out.println("user size:" + users.size() + ";log size:" + logs.size());
if (joinType.equals("innerJoin")) {
if (users.size() > 0 && logs.size() > 0) {
for (Text user : users) {
for (Text log : logs) {
context.write(user, log);
}
}
}
} else if (joinType.equals("leftOuter")) {
for (Text user : users) {
if (logs.size() > 0) {
for (Text log : logs) {
context.write(user, log);
}
} else {
context.write(user, createEmptyLog());
}
}
} else if (joinType.equals("rightOuter")) {
for (Text log : logs) {
if (users.size() > 0) {
for (Text user : users) {
context.write(user, log);
}
}else{
context.write( createEmptyLog(),log);
}
}
} else if (joinType.equals("allOuter")) {
if (users.size() > 0) {
for (Text user : users) {
if (logs.size() > 0) {
for (Text log : logs) {
context.write(user, log);
}
} else {
context.write(user, createEmptyLog());
}
}
} else {
for (Text log : logs) {
if (users.size() > 0) {
for (Text user : users) {
context.write(user, log);
}
}else {
context.write(createEmptyLog(), log);
}
}
}
}
}
文章转载自小邱说,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




