电子商务网站是个性化推荐系统重要地应用的领域之一,亚马逊就是个性化推荐系统的积极应用者和推广者,亚马逊的推荐系统深入到网站的各类商品,为亚马逊带来了至少30%的销售额。
不光是电商类,推荐系统无处不在。QQ,人人网的好友推荐;新浪微博的你可能感觉兴趣的人;优酷,土豆的电影推荐;豆瓣的图书推荐;大从点评的餐饮推荐;世纪佳缘的相亲推荐;天际网的职业推荐等。
推荐算法分类:
按数据使用划分:
• 协同过滤算法:UserCF, ItemCF, ModelCF
• 基于内容的推荐: 用户内容属性和物品内容属性
• 社会化过滤:基于用户的社会网络关系
基于用户的协同过滤算法UserCF
基于用户的协同过滤,通过不同用户对物品的评分来评测用户之间的相似性,基于用户之间的相似性做出推荐。简单来讲就是:给用户推荐和他兴趣相似的其他用户喜欢的物品。
用例说明:
基于物品的协同过滤算法ItemCF
基于item的协同过滤,通过用户对不同item的评分来评测item之间的相似性,基于item之间的相似性做出推荐。简单来讲就是:给用户推荐和他之前喜欢的物品相似的物品。
用例说明:
注:基于物品的协同过滤算法,是目前商用最广泛的推荐算法。
案例:天池大数据竞赛
会开放如下数据类型:
字 段 | 字段说明 | 提取说明 |
user_id | 用户标记 | 抽样&字段加密 |
Time | 行为时间 | 精度到天级别&隐藏年份 |
action_type | 用户对品牌的行为类型 | 包括点击、购买、加入购物车、收藏4种行为 |
brand_id | 品牌数字ID | 抽样&字段加密 |
提供的数据量,涉及千万级天猫用户,万级天猫品牌,时间跨度4个月的行为记录。
用户4种行为类型(Type)对应代码分别为:
点击:0;购买:1;收藏:2;购物车:3
数据案例实现原理为:
Co-occurrence Matrix(同现矩阵)和User Preference Vector(用户评分向量)相乘得到的这个Recommended Vector(推荐向量)的意义
理论上这两个东西相乘实现Item Based Cooperative Filtering(基于物品的协同过滤),通俗的解释:
ItemBased:基于物品的(区分于基于用户的)体现在同现矩阵,把所有用户对物品打过分的记录都拿过来,形成一个个反应物品关联度的矩阵Co-occurrence Matrix,下面简称C矩阵。
为什么乘以User Preference Vector用户评分向量就是Recommended Vector(推荐向量),这个推荐向量又要怎么用呢?
还是用R的第三项24.5来做一下解释,
R3的解释:对于用户U商品103的可推荐度。
这点很重要,理解这点就是要知道我们这一系列算法过程在做什么(What)。
把R3也就是R103的计算用公式表示如下:
R3怎么出来的
从上面可以看到C103i*Ui就是Ui代表用户对i的喜爱度,C103i代表i和103同时出现的次数,i物品和103同时出现得越多C103i越大,用户对i的喜爱度值越大Ui越大,自然R103值就越大,越值得推荐103。
R向量里面的R101, R104,R105和R107这三项值很大,但是我们可以忽略它们,因为用户已经对该物品购买过的,也就是已经买个这些了,可以不推荐了,对于用户没有买过的几项里面选出最大(或者TopN)的物品推荐就可以了,
上面R102,R103,R106里面选一个最大值103,103就是可以推荐的商品了
代码实现
数据:
每行逗号分隔
思路:
部分代码:
public class StartRun {
public static void main(String[] args) {
Configuration config = new Configuration();
config.set("fs.defaultFS", "hdfs://node1:8020");
config.set("yarn.resourcemanager.hostname", "node3");
// 所有mr的输入和输出目录定义在map集合中
Map<String, String> paths = new HashMap<String, String>();
paths.put("Step1Input", "/user/itemcf/input/(sample)sam_tianchi_2014002_rec_tmall_log.csv");
paths.put("Step1Output", "/user/itemcf/output/step1");
paths.put("Step2Input", paths.get("Step1Output"));
paths.put("Step2Output", "/user/itemcf/output/step2");
paths.put("Step3Input", paths.get("Step2Output"));
paths.put("Step3Output", "/user/itemcf/output/step3");
paths.put("Step4Input1", paths.get("Step2Output"));
paths.put("Step4Input2", paths.get("Step3Output"));
paths.put("Step4Output", "/user/itemcf/output/step4");
paths.put("Step5Input", paths.get("Step4Output"));
paths.put("Step5Output", "/user/itemcf/output/step5");
paths.put("Step6Input", paths.get("Step5Output"));
paths.put("Step6Output", "/user/itemcf/output/step6");
// Step1.run(config, paths);
// Step2.run(config, paths);
// Step3.run(config, paths);
// Step4.run(config, paths);
// Step5.run(config, paths);
Step6.run(config, paths);
}
public static Map<String, Integer> R = new HashMap<String, Integer>();
static {
R.put("click", 1);
R.put("collect", 2);
R.put("cart", 3);
R.put("alipay", 4);
}
}
/**
* 去重
* @author root
*/
public class Step1 {
public static boolean run(Configuration config, Map<String, String> paths) {
try {
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
job.setJobName("step1");
// config.set("mapred.jar", "D:\\MR\\item.jar");
job.setJarByClass(Step1.class);
job.setMapperClass(Step1_Mapper.class);
job.setReducerClass(Step1_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(paths.get("Step1Input")));
Path outpath = new Path(paths.get("Step1Output"));
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean f = job.waitForCompletion(true);
return f;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
static class Step1_Mapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (key.get() != 0) {
context.write(value, NullWritable.get());
}
}
}
static class Step1_Reducer extends Reducer<Text, IntWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> i, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
}
/**
* 按用户分组,计算所有物品出现的组合列表,得到用户对物品的喜爱度得分矩阵
u13 i160:1,
u14 i25:1,i223:1,
u16 i252:1,
u21 i266:1,
u24 i64:1,i218:1,i185:1,
u26 i276:1,i201:1,i348:1,i321:1,i136:1,
* @author root
*/
public class Step2 {
public static boolean run(Configuration config, Map<String, String> paths) {
try {
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
job.setJobName("step2");
job.setJarByClass(StartRun.class);
job.setMapperClass(Step2_Mapper.class);
job.setReducerClass(Step2_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(paths.get("Step2Input")));
Path outpath = new Path(paths.get("Step2Output"));
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean f = job.waitForCompletion(true);
return f;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
static class Step2_Mapper extends Mapper<LongWritable, Text, Text, Text> {
// 如果使用:用戶+物品,同时作为输出key,更好
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
String item = tokens[0];
String user = tokens[1];
String action = tokens[2];
Text k = new Text(user);
Integer rv = StartRun.R.get(action);
// if(rv!=null){
Text v = new Text(item + ":" + rv.intValue());
context.write(k, v);
}
}
static class Step2_Reducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> i, Context context)
throws IOException, InterruptedException {
Map<String, Integer> r = new HashMap<String, Integer>();
for (Text value : i) {
String[] vs = value.toString().split(":");
String item = vs[0];
Integer action = Integer.parseInt(vs[1]);
action = ((Integer) (r.get(item) == null ? 0 : r.get(item))).intValue() + action;
r.put(item, action);
}
StringBuffer sb = new StringBuffer();
for (Entry<String, Integer> entry : r.entrySet()) {
sb.append(entry.getKey() + ":" + entry.getValue().intValue() + ",");
}
context.write(key, new Text(sb.toString()));
}
}
}
/**
* 对物品组合列表进行计数,建立物品的同现矩阵
i100:i100 3
i100:i105 1
i100:i106 1
i100:i109 1
i100:i114 1
i100:i124 1
* @author root
*/
public class Step3 {
private final static Text K = new Text();
private final static IntWritable V = new IntWritable(1);
public static boolean run(Configuration config, Map<String, String> paths) {
try {
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
job.setJobName("step3");
job.setJarByClass(StartRun.class);
job.setMapperClass(Step3_Mapper.class);
job.setReducerClass(Step3_Reducer.class);
job.setCombinerClass(Step3_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat
.addInputPath(job, new Path(paths.get("Step3Input")));
Path outpath = new Path(paths.get("Step3Output"));
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean f = job.waitForCompletion(true);
return f;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// 第二个MR执行的结果--作为本次MR的输入 样本: u2837 i541:1,i331:1,i314:1,i125:1,
static class Step3_Mapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split("\t");
String[] items = tokens[1].split(",");
for (int i = 0; i < items.length; i++) {
String itemA = items[i].split(":")[0];
for (int j = 0; j < items.length; j++) {
String itemB = items[j].split(":")[0];
K.set(itemA + ":" + itemB);
context.write(K, V);
}
}
}
}
static class Step3_Reducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> i, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable v : i) {
sum = sum + v.get();
}
V.set(sum);
context.write(key, V);
// 执行结果
// i100:i181 1
// i100:i184 2
}
}
}
/**
* 把同现矩阵和得分矩阵相乘
*
* @author root
*/
public class Step4 {
public static boolean run(Configuration config, Map<String, String> paths) {
try {
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
job.setJobName("step4");
job.setJarByClass(StartRun.class);
job.setMapperClass(Step4_Mapper.class);
job.setReducerClass(Step4_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,
new Path[] { new Path(paths.get("Step4Input1")),
new Path(paths.get("Step4Input2")) });
Path outpath = new Path(paths.get("Step4Output"));
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean f = job.waitForCompletion(true);
return f;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
static class Step4_Mapper extends Mapper<LongWritable, Text, Text, Text> {
private String flag;// A同现矩阵 or B得分矩阵
// 每个maptask,初始化时调用一次
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getParent().getName();// 判断读的数据集
System.out.println(flag + "**********************");
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = Pattern.compile("[\t,]").split(value.toString());
if (flag.equals("step3")) {// 同现矩阵
// 样本: i100:i181 1
// i100:i184 2
String[] v1 = tokens[0].split(":");
String itemID1 = v1[0];
String itemID2 = v1[1];
String num = tokens[1];
Text k = new Text(itemID1);// 以前一个物品为key 比如i100
Text v = new Text("A:" + itemID2 + "," + num);// A:i109,1
context.write(k, v);
} else if (flag.equals("step2")) {// 用户对物品喜爱得分矩阵
// 样本: u24 i64:1,i218:1,i185:1,
String userID = tokens[0];
for (int i = 1; i < tokens.length; i++) {
String[] vector = tokens[i].split(":");
String itemID = vector[0];// 物品id
String pref = vector[1];// 喜爱分数
Text k = new Text(itemID); // 以物品为key 比如:i100
Text v = new Text("B:" + userID + "," + pref); // B:u401,2
context.write(k, v);
}
}
}
}
static class Step4_Reducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// A同现矩阵 or B得分矩阵
// 某一个物品,针对它和其他所有物品的同现次数,都在mapA集合中
Map<String, Integer> mapA = new HashMap<String, Integer>();
//和该物品(key中的itemID)同现的其他物品的同现集合//
//其他物品ID为map的key,同现数字为值
Map<String, Integer> mapB = new HashMap<String, Integer>();
//该物品(key中的itemID),所有用户的推荐权重分数
for (Text line : values) {
String val = line.toString();
if (val.startsWith("A:")) {// 表示物品同现数字
String[] kv = Pattern.compile("[\t,]").split(
val.substring(2));
try {
mapA.put(kv[0], Integer.parseInt(kv[1]));
} catch (Exception e) {
e.printStackTrace();
}
} else if (val.startsWith("B:")) {
String[] kv = Pattern.compile("[\t,]").split(
val.substring(2));
try {
mapB.put(kv[0], Integer.parseInt(kv[1]));
} catch (Exception e) {
e.printStackTrace();
}
}
}
double result = 0;
Iterator<String> iter = mapA.keySet().iterator();
while (iter.hasNext()) {
String mapk = iter.next();// itemID
int num = mapA.get(mapk).intValue();
Iterator<String> iterb = mapB.keySet().iterator();
while (iterb.hasNext()) {
String mapkb = iterb.next();// userID
int pref = mapB.get(mapkb).intValue();
result = num * pref;// 矩阵乘法相乘计算
Text k = new Text(mapkb);
Text v = new Text(mapk + "," + result);
context.write(k, v);
}
}
// 结果样本: u2723 i9,8.0
}
}
}
/**
*
* 把相乘之后的矩阵相加获得结果矩阵
*
* @author root
*/
public class Step5 {
private final static Text K = new Text();
private final static Text V = new Text();
public static boolean run(Configuration config, Map<String, String> paths) {
try {
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
job.setJobName("step5");
job.setJarByClass(StartRun.class);
job.setMapperClass(Step5_Mapper.class);
job.setReducerClass(Step5_Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat
.addInputPath(job, new Path(paths.get("Step5Input")));
Path outpath = new Path(paths.get("Step5Output"));
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean f = job.waitForCompletion(true);
return f;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
static class Step5_Mapper extends Mapper<LongWritable, Text, Text, Text> {
/**
* 原封不动输出
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 样本: u2723 i9,8.0
String[] tokens = Pattern.compile("[\t,]").split(value.toString());
Text k = new Text(tokens[0]);// 用户为key
Text v = new Text(tokens[1] + "," + tokens[2]);
context.write(k, v);
}
}
static class Step5_Reducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Map<String, Double> map = new HashMap<String, Double>();// 结果
for (Text line : values) {// i9,8.0
String[] tokens = line.toString().split(",");
String itemID = tokens[0];
Double score = Double.parseDouble(tokens[1]);
if (map.containsKey(itemID)) {
map.put(itemID, map.get(itemID) + score);// 矩阵乘法求和计算
} else {
map.put(itemID, score);
}
}
Iterator<String> iter = map.keySet().iterator();
while (iter.hasNext()) {
String itemID = iter.next();
double score = map.get(itemID);
Text v = new Text(itemID + "," + score);
context.write(key, v);
}
}
// 样本: u13 i9,5.0
}
}
/**
*
* 按照推荐得分降序排序,每个用户列出10个推荐物品
*
* @author root
*/
public class Step6 {
private final static Text K = new Text();
private final static Text V = new Text();
public static boolean run(Configuration config, Map<String, String> paths) {
try {
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
job.setJobName("step6");
job.setJarByClass(StartRun.class);
job.setMapperClass(Step6_Mapper.class);
job.setReducerClass(Step6_Reducer.class);
job.setSortComparatorClass(NumSort.class);
job.setGroupingComparatorClass(UserGroup.class);
job.setMapOutputKeyClass(PairWritable.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat
.addInputPath(job, new Path(paths.get("Step6Input")));
Path outpath = new Path(paths.get("Step6Output"));
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean f = job.waitForCompletion(true);
return f;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
static class Step6_Mapper extends
Mapper<LongWritable, Text, PairWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = Pattern.compile("[\t,]").split(value.toString());
String u = tokens[0];
String item = tokens[1];
String num = tokens[2];
PairWritable k = new PairWritable();
k.setUid(u);
k.setNum(Double.parseDouble(num));
V.set(item + ":" + num);
context.write(k, V);
}
}
static class Step6_Reducer extends Reducer<PairWritable, Text, Text, Text> {
@Override
protected void reduce(PairWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
int i = 0;
StringBuffer sb = new StringBuffer();
for (Text v : values) {
if (i == 10)
break;
sb.append(v.toString() + ",");
i++;
}
K.set(key.getUid());
V.set(sb.toString());
context.write(K, V);
}
}
static class PairWritable implements WritableComparable<PairWritable> {
private String uid;
private double num;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(uid);
out.writeDouble(num);
}
@Override
public void readFields(DataInput in) throws IOException {
this.uid = in.readUTF();
this.num = in.readDouble();
}
@Override
public int compareTo(PairWritable o) {
int r = this.uid.compareTo(o.getUid());
if (r == 0) {
return Double.compare(this.num, o.getNum());
}
return r;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public double getNum() {
return num;
}
public void setNum(double num) {
this.num = num;
}
}
static class NumSort extends WritableComparator {
public NumSort() {
super(PairWritable.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
PairWritable o1 = (PairWritable) a;
PairWritable o2 = (PairWritable) b;
int r = o1.getUid().compareTo(o2.getUid());
if (r == 0) {
return -Double.compare(o1.getNum(), o2.getNum());
}
return r;
}
}
static class UserGroup extends WritableComparator {
public UserGroup() {
super(PairWritable.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
PairWritable o1 = (PairWritable) a;
PairWritable o2 = (PairWritable) b;
return o1.getUid().compareTo(o2.getUid());
}
}
}
结果