电子商务网站是个性化推荐系统重要地应用的领域之一,亚马逊就是个性化推荐系统的积极应用者和推广者,亚马逊的推荐系统深入到网站的各类商品,为亚马逊带来了至少30%的销售额。
    不光是电商类,推荐系统无处不在。QQ,人人网的好友推荐;新浪微博的你可能感觉兴趣的人;优酷,土豆的电影推荐;豆瓣的图书推荐;大从点评的餐饮推荐;世纪佳缘的相亲推荐;天际网的职业推荐等。
推荐算法分类:
    按数据使用划分:
    • 协同过滤算法:UserCF, ItemCF, ModelCF
    • 基于内容的推荐: 用户内容属性和物品内容属性
    • 社会化过滤:基于用户的社会网络关系
基于用户的协同过滤算法UserCF
    基于用户的协同过滤,通过不同用户对物品的评分来评测用户之间的相似性,基于用户之间的相似性做出推荐。简单来讲就是:给用户推荐和他兴趣相似的其他用户喜欢的物品。
用例说明:

 
基于物品的协同过滤算法ItemCF
    基于item的协同过滤,通过用户对不同item的评分来评测item之间的相似性,基于item之间的相似性做出推荐。简单来讲就是:给用户推荐和他之前喜欢的物品相似的物品。
用例说明:
 
注:基于物品的协同过滤算法,是目前商用最广泛的推荐算法。


案例:天池大数据竞赛

    会开放如下数据类型:

 字 段
 字段说明  
   提取说明
user_id 
用户标记
抽样&字段加密
Time
行为时间
精度到天级别&隐藏年份
action_type
用户对品牌的行为类型
包括点击、购买、加入购物车、收藏4种行为 
 brand_id
品牌数字ID
抽样&字段加密

     提供的数据量,涉及千万级天猫用户,万级天猫品牌,时间跨度4个月的行为记录。 
      
     用户4种行为类型(Type)对应代码分别为: 
     点击:0;购买:1;收藏:2;购物车:3 

数据案例实现原理为:
Co-occurrence Matrix(同现矩阵)和User Preference Vector(用户评分向量)相乘得到的这个Recommended Vector(推荐向量)的意义
 

理论上这两个东西相乘实现Item Based Cooperative Filtering(基于物品的协同过滤),通俗的解释:
    ItemBased:基于物品的(区分于基于用户的)体现在同现矩阵,把所有用户对物品打过分的记录都拿过来,形成一个个反应物品关联度的矩阵Co-occurrence Matrix,下面简称C矩阵。
    为什么乘以User Preference Vector用户评分向量就是Recommended Vector(推荐向量),这个推荐向量又要怎么用呢?

还是用R的第三项24.5来做一下解释,
    R3的解释:对于用户U商品103的可推荐度。
    这点很重要,理解这点就是要知道我们这一系列算法过程在做什么(What)。

把R3也就是R103的计算用公式表示如下
 

R3怎么出来的
    从上面可以看到C103i*Ui就是Ui代表用户对i的喜爱度,C103i代表i和103同时出现的次数,i物品和103同时出现得越多C103i越大,用户对i的喜爱度值越大Ui越大,自然R103值就越大,越值得推荐103

     
    R向量里面的R101, R104,R105和R107这三项值很大,但是我们可以忽略它们,因为用户已经对该物品购买过的,也就是已经买个这些了,可以不推荐了,对于用户没有买过的几项里面选出最大(或者TopN)的物品推荐就可以了,

    上面R102,R103,R106里面选一个最大值103,103就是可以推荐的商品了

代码实现
数据:
    
    每行逗号分隔

思路:


  部分代码:
public class StartRun {

	public static void main(String[] args) {
		Configuration config = new Configuration();
		config.set("fs.defaultFS", "hdfs://node1:8020");
		config.set("yarn.resourcemanager.hostname", "node3");
		
		// 所有mr的输入和输出目录定义在map集合中
		Map<String, String> paths = new HashMap<String, String>();
		paths.put("Step1Input", "/user/itemcf/input/(sample)sam_tianchi_2014002_rec_tmall_log.csv");
		paths.put("Step1Output", "/user/itemcf/output/step1");
		paths.put("Step2Input", paths.get("Step1Output"));
		paths.put("Step2Output", "/user/itemcf/output/step2");
		paths.put("Step3Input", paths.get("Step2Output"));
		paths.put("Step3Output", "/user/itemcf/output/step3");
		paths.put("Step4Input1", paths.get("Step2Output"));
		paths.put("Step4Input2", paths.get("Step3Output"));
		paths.put("Step4Output", "/user/itemcf/output/step4");
		paths.put("Step5Input", paths.get("Step4Output"));
		paths.put("Step5Output", "/user/itemcf/output/step5");
		paths.put("Step6Input", paths.get("Step5Output"));
		paths.put("Step6Output", "/user/itemcf/output/step6");

//		Step1.run(config, paths);
//		Step2.run(config, paths);
//		Step3.run(config, paths);
//		Step4.run(config, paths);
//		Step5.run(config, paths);
		Step6.run(config, paths);
	}

	public static Map<String, Integer> R = new HashMap<String, Integer>();
	static {
		R.put("click", 1);
		R.put("collect", 2);
		R.put("cart", 3);
		R.put("alipay", 4);
	}
}

/**
 * 去重
 * @author root
 */
public class Step1 {

	public static boolean run(Configuration config, Map<String, String> paths) {
		try {
			FileSystem fs = FileSystem.get(config);
			Job job = Job.getInstance(config);
			
			job.setJobName("step1");

//			config.set("mapred.jar", "D:\\MR\\item.jar");

			job.setJarByClass(Step1.class);
			job.setMapperClass(Step1_Mapper.class);
			job.setReducerClass(Step1_Reducer.class);

			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(NullWritable.class);

			FileInputFormat.addInputPath(job, new Path(paths.get("Step1Input")));
			
			Path outpath = new Path(paths.get("Step1Output"));
			if (fs.exists(outpath)) {
				fs.delete(outpath, true);
			}
			FileOutputFormat.setOutputPath(job, outpath);

			boolean f = job.waitForCompletion(true);
			return f;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	static class Step1_Mapper extends Mapper<LongWritable, Text, Text, NullWritable> {

		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			if (key.get() != 0) {
				context.write(value, NullWritable.get());
			}
		}
	}

	static class Step1_Reducer extends Reducer<Text, IntWritable, Text, NullWritable> {

		@Override
		protected void reduce(Text key, Iterable<IntWritable> i, Context context)
				throws IOException, InterruptedException {
			context.write(key, NullWritable.get());
		}
	}
}

/**
 * 按用户分组,计算所有物品出现的组合列表,得到用户对物品的喜爱度得分矩阵
	u13	i160:1,
	u14	i25:1,i223:1,
	u16	i252:1,
	u21	i266:1,
	u24	i64:1,i218:1,i185:1,
	u26	i276:1,i201:1,i348:1,i321:1,i136:1,
 * @author root
 */
public class Step2 {

	public static boolean run(Configuration config, Map<String, String> paths) {
		try {
			FileSystem fs = FileSystem.get(config);
			Job job = Job.getInstance(config);
			
			job.setJobName("step2");
			job.setJarByClass(StartRun.class);
			job.setMapperClass(Step2_Mapper.class);
			job.setReducerClass(Step2_Reducer.class);

			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);

			FileInputFormat.addInputPath(job, new Path(paths.get("Step2Input")));
			Path outpath = new Path(paths.get("Step2Output"));
			if (fs.exists(outpath)) {
				fs.delete(outpath, true);
			}
			FileOutputFormat.setOutputPath(job, outpath);

			boolean f = job.waitForCompletion(true);
			return f;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	static class Step2_Mapper extends Mapper<LongWritable, Text, Text, Text> {

		// 如果使用:用戶+物品,同时作为输出key,更好
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] tokens = value.toString().split(",");
			String item = tokens[0];
			String user = tokens[1];
			String action = tokens[2];
			Text k = new Text(user);
			Integer rv = StartRun.R.get(action);
			// if(rv!=null){
			Text v = new Text(item + ":" + rv.intValue());
			context.write(k, v);
		}
	}

	static class Step2_Reducer extends Reducer<Text, Text, Text, Text> {

		@Override
		protected void reduce(Text key, Iterable<Text> i, Context context)
				throws IOException, InterruptedException {
			Map<String, Integer> r = new HashMap<String, Integer>();

			
			for (Text value : i) {
				String[] vs = value.toString().split(":");
				String item = vs[0];
				Integer action = Integer.parseInt(vs[1]);
				action = ((Integer) (r.get(item) == null ? 0 : r.get(item))).intValue() + action;
				r.put(item, action);
			}
			StringBuffer sb = new StringBuffer();
			for (Entry<String, Integer> entry : r.entrySet()) {
				sb.append(entry.getKey() + ":" + entry.getValue().intValue() + ",");
			}

			context.write(key, new Text(sb.toString()));
		}
	}
}


/**
 * 对物品组合列表进行计数,建立物品的同现矩阵
	i100:i100	3
	i100:i105	1
	i100:i106	1
	i100:i109	1
	i100:i114	1
	i100:i124	1
 * @author root
 */
public class Step3 {
	private final static Text K = new Text();
	private final static IntWritable V = new IntWritable(1);

	public static boolean run(Configuration config, Map<String, String> paths) {
		try {
			FileSystem fs = FileSystem.get(config);
			Job job = Job.getInstance(config);
			job.setJobName("step3");
			job.setJarByClass(StartRun.class);
			job.setMapperClass(Step3_Mapper.class);
			job.setReducerClass(Step3_Reducer.class);
			job.setCombinerClass(Step3_Reducer.class);

			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(IntWritable.class);

			FileInputFormat
					.addInputPath(job, new Path(paths.get("Step3Input")));
			Path outpath = new Path(paths.get("Step3Output"));
			if (fs.exists(outpath)) {
				fs.delete(outpath, true);
			}
			FileOutputFormat.setOutputPath(job, outpath);

			boolean f = job.waitForCompletion(true);
			return f;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	// 第二个MR执行的结果--作为本次MR的输入  样本: u2837	 i541:1,i331:1,i314:1,i125:1,
	static class Step3_Mapper extends
			Mapper<LongWritable, Text, Text, IntWritable> {

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] tokens = value.toString().split("\t");
			String[] items = tokens[1].split(",");
			for (int i = 0; i < items.length; i++) {
				String itemA = items[i].split(":")[0];
				for (int j = 0; j < items.length; j++) {
					String itemB = items[j].split(":")[0];
					K.set(itemA + ":" + itemB);
					context.write(K, V);
				}
			}

		}
	}
	

	static class Step3_Reducer extends
			Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		protected void reduce(Text key, Iterable<IntWritable> i, Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable v : i) {
				sum = sum + v.get();
			}
			V.set(sum);
			context.write(key, V);
			//  执行结果
//			i100:i181	1
//			i100:i184	2
		}
	}
}

/**
 * 把同现矩阵和得分矩阵相乘
 * 
 * @author root
 */
public class Step4 {

	public static boolean run(Configuration config, Map<String, String> paths) {
		try {
			FileSystem fs = FileSystem.get(config);
			Job job = Job.getInstance(config);
			job.setJobName("step4");
			job.setJarByClass(StartRun.class);
			job.setMapperClass(Step4_Mapper.class);
			job.setReducerClass(Step4_Reducer.class);
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);

			FileInputFormat.setInputPaths(job,
					new Path[] { new Path(paths.get("Step4Input1")),
							new Path(paths.get("Step4Input2")) });
			Path outpath = new Path(paths.get("Step4Output"));
			if (fs.exists(outpath)) {
				fs.delete(outpath, true);
			}
			FileOutputFormat.setOutputPath(job, outpath);

			boolean f = job.waitForCompletion(true);
			return f;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	static class Step4_Mapper extends Mapper<LongWritable, Text, Text, Text> {
		private String flag;// A同现矩阵 or B得分矩阵

		// 每个maptask,初始化时调用一次
		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			FileSplit split = (FileSplit) context.getInputSplit();
			flag = split.getPath().getParent().getName();// 判断读的数据集

			System.out.println(flag + "**********************");
		}

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] tokens = Pattern.compile("[\t,]").split(value.toString());

			if (flag.equals("step3")) {// 同现矩阵
				// 样本:  i100:i181	1
				//       i100:i184	2
				String[] v1 = tokens[0].split(":");
				String itemID1 = v1[0];
				String itemID2 = v1[1];
				String num = tokens[1];

				Text k = new Text(itemID1);// 以前一个物品为key 比如i100
				Text v = new Text("A:" + itemID2 + "," + num);// A:i109,1

				context.write(k, v);

			} else if (flag.equals("step2")) {// 用户对物品喜爱得分矩阵
				// 样本:  u24  i64:1,i218:1,i185:1,
				String userID = tokens[0];
				for (int i = 1; i < tokens.length; i++) {
					String[] vector = tokens[i].split(":");
					String itemID = vector[0];// 物品id
					String pref = vector[1];// 喜爱分数

					Text k = new Text(itemID); // 以物品为key 比如:i100
					Text v = new Text("B:" + userID + "," + pref); // B:u401,2

					context.write(k, v);
				}
			}
		}
	}

	static class Step4_Reducer extends Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			// A同现矩阵 or B得分矩阵
			// 某一个物品,针对它和其他所有物品的同现次数,都在mapA集合中
			Map<String, Integer> mapA = new HashMap<String, Integer>();
			//和该物品(key中的itemID)同现的其他物品的同现集合//
			//其他物品ID为map的key,同现数字为值
			
			Map<String, Integer> mapB = new HashMap<String, Integer>();
			//该物品(key中的itemID),所有用户的推荐权重分数

			for (Text line : values) {
				String val = line.toString();
				if (val.startsWith("A:")) {// 表示物品同现数字
					String[] kv = Pattern.compile("[\t,]").split(
							val.substring(2));
					try {
						mapA.put(kv[0], Integer.parseInt(kv[1]));
					} catch (Exception e) {
						e.printStackTrace();
					}

				} else if (val.startsWith("B:")) {
					String[] kv = Pattern.compile("[\t,]").split(
							val.substring(2));
					try {
						mapB.put(kv[0], Integer.parseInt(kv[1]));
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}

			
			double result = 0;
			Iterator<String> iter = mapA.keySet().iterator();
			while (iter.hasNext()) {
				String mapk = iter.next();// itemID

				int num = mapA.get(mapk).intValue();
				Iterator<String> iterb = mapB.keySet().iterator();
				while (iterb.hasNext()) {
					String mapkb = iterb.next();// userID
					int pref = mapB.get(mapkb).intValue();
					result = num * pref;// 矩阵乘法相乘计算

					Text k = new Text(mapkb);
					Text v = new Text(mapk + "," + result);
					context.write(k, v);
				}
			}
			
			//  结果样本:  u2723	i9,8.0
		}
	}
}


/**
 * 
 * 把相乘之后的矩阵相加获得结果矩阵
 * 
 * @author root
 */
public class Step5 {
	private final static Text K = new Text();
	private final static Text V = new Text();

	public static boolean run(Configuration config, Map<String, String> paths) {
		try {
			FileSystem fs = FileSystem.get(config);
			Job job = Job.getInstance(config);
			job.setJobName("step5");
			job.setJarByClass(StartRun.class);
			job.setMapperClass(Step5_Mapper.class);
			job.setReducerClass(Step5_Reducer.class);
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);

			FileInputFormat
					.addInputPath(job, new Path(paths.get("Step5Input")));
			Path outpath = new Path(paths.get("Step5Output"));
			if (fs.exists(outpath)) {
				fs.delete(outpath, true);
			}
			FileOutputFormat.setOutputPath(job, outpath);

			boolean f = job.waitForCompletion(true);
			return f;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	static class Step5_Mapper extends Mapper<LongWritable, Text, Text, Text> {

		/**
		 * 原封不动输出
		 */
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			//  样本:  u2723	i9,8.0
			String[] tokens = Pattern.compile("[\t,]").split(value.toString());
			Text k = new Text(tokens[0]);// 用户为key
			Text v = new Text(tokens[1] + "," + tokens[2]);
			context.write(k, v);
		}
	}

	static class Step5_Reducer extends Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			Map<String, Double> map = new HashMap<String, Double>();// 结果

			for (Text line : values) {// i9,8.0
				String[] tokens = line.toString().split(",");
				String itemID = tokens[0];
				Double score = Double.parseDouble(tokens[1]);
				
				if (map.containsKey(itemID)) {
					map.put(itemID, map.get(itemID) + score);// 矩阵乘法求和计算
				} else {
					map.put(itemID, score);
				}
			}

			Iterator<String> iter = map.keySet().iterator();
			while (iter.hasNext()) {
				String itemID = iter.next();
				double score = map.get(itemID);
				Text v = new Text(itemID + "," + score);
				context.write(key, v);
			}
		}
		// 样本:  u13	i9,5.0
	}
}


/**
 * 
 * 按照推荐得分降序排序,每个用户列出10个推荐物品
 * 
 * @author root
 */
public class Step6 {
	private final static Text K = new Text();
	private final static Text V = new Text();

	public static boolean run(Configuration config, Map<String, String> paths) {
		try {
			FileSystem fs = FileSystem.get(config);
			Job job = Job.getInstance(config);
			job.setJobName("step6");
			job.setJarByClass(StartRun.class);
			job.setMapperClass(Step6_Mapper.class);
			job.setReducerClass(Step6_Reducer.class);
			job.setSortComparatorClass(NumSort.class);
			job.setGroupingComparatorClass(UserGroup.class);
			job.setMapOutputKeyClass(PairWritable.class);
			job.setMapOutputValueClass(Text.class);

			FileInputFormat
					.addInputPath(job, new Path(paths.get("Step6Input")));
			Path outpath = new Path(paths.get("Step6Output"));
			if (fs.exists(outpath)) {
				fs.delete(outpath, true);
			}
			FileOutputFormat.setOutputPath(job, outpath);

			boolean f = job.waitForCompletion(true);
			return f;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return false;
	}

	static class Step6_Mapper extends
			Mapper<LongWritable, Text, PairWritable, Text> {

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] tokens = Pattern.compile("[\t,]").split(value.toString());
			String u = tokens[0];
			String item = tokens[1];
			String num = tokens[2];
			PairWritable k = new PairWritable();
			k.setUid(u);
			k.setNum(Double.parseDouble(num));
			V.set(item + ":" + num);
			context.write(k, V);
		}
	}

	static class Step6_Reducer extends Reducer<PairWritable, Text, Text, Text> {
		@Override
		protected void reduce(PairWritable key, Iterable<Text> values,
				Context context) throws IOException, InterruptedException {
			int i = 0;
			StringBuffer sb = new StringBuffer();
			for (Text v : values) {
				if (i == 10)
					break;
				sb.append(v.toString() + ",");
				i++;
			}
			K.set(key.getUid());
			V.set(sb.toString());
			context.write(K, V);
		}

	}

	static class PairWritable implements WritableComparable<PairWritable> {

		private String uid;
		private double num;

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeUTF(uid);
			out.writeDouble(num);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.uid = in.readUTF();
			this.num = in.readDouble();
		}

		@Override
		public int compareTo(PairWritable o) {
			int r = this.uid.compareTo(o.getUid());
			if (r == 0) {
				return Double.compare(this.num, o.getNum());
			}
			return r;
		}

		public String getUid() {
			return uid;
		}

		public void setUid(String uid) {
			this.uid = uid;
		}

		public double getNum() {
			return num;
		}

		public void setNum(double num) {
			this.num = num;
		}

	}

	static class NumSort extends WritableComparator {
		public NumSort() {
			super(PairWritable.class, true);
		}

		@Override
		public int compare(WritableComparable a, WritableComparable b) {
			PairWritable o1 = (PairWritable) a;
			PairWritable o2 = (PairWritable) b;

			int r = o1.getUid().compareTo(o2.getUid());
			if (r == 0) {
				return -Double.compare(o1.getNum(), o2.getNum());
			}
			return r;
		}
	}

	static class UserGroup extends WritableComparator {
		public UserGroup() {
			super(PairWritable.class, true);
		}

		@Override
		public int compare(WritableComparable a, WritableComparable b) {
			PairWritable o1 = (PairWritable) a;
			PairWritable o2 = (PairWritable) b;
			return o1.getUid().compareTo(o2.getUid());
		}
	}
}
  结果

添加新评论