矩阵分解推荐算法-----LFM
因为很多时候,读者对于书籍的分类与编辑对书籍的分类不同,比如《具体数学》这本书,有的人认为是数学, 有的人认为属于计算机.内容是属于数学的,而大多数读者属于计算机. 编辑的分类是从内容上出发的, 而不是从书的读者出发.
如果我们从数据出发的,自动地找到那些类,然后进行个性化推荐的技术就是隐含语义分析技术(latent variable analysis). 隐含语义分析技术采用的是基于用户行为统计的自动聚类.
而LFM就是隐含语义分析技术的一个著名的模型算法.
LFM通过如下公式计算用户u对物品i的兴趣:
\]
其中\(p_{u,k}和q_{i, k}\)是模型的参数, 其中\(p_{u, k}\)度量了用户u的兴趣和第k个隐类的关系, \(q_{i,k}\)度量了第k个隐类和物品i之间的关系. K表示隐类的数量.
常用的推荐算法的损失函数:
\]
其中\(\lambda ||p_u||^2 + \lambda ||q_i||^2\)是用来防止过拟合的正则化项.
我们对损失函数求导:
\\
\frac{\partial C}{\partial q_{ik}} = -2p_{uk}\cdot e_{ui} + 2\lambda q_{ik}\\
\\
其中e_{iu} = r_{iu}-\hat r_{iu}
\]
通过LFM实现简单的音乐推荐, 数据集为data.csv, song.csv (点击可下载)
# !/usr/bin/python3
# @Author: XiaoXia
# @Time : 2020/8/14 10:13
# @Site :
# @Software: PyCharm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
def LFM(R: np.ndarray, P: np.ndarray, Q: np.ndarray, K: int, steps=5000, learning_rate: float = 0.01, beta: float = 0.02, min_loss: float = 0.001, min_interval: float = 1e-3):
"""
LFM算法
:param min_interval: 梯度下降时,每次下降的最小损失差, 小于则停止梯度下降
:param R: 用户-歌曲矩阵
:param P: 用户隐语义矩阵
:param Q: 歌曲隐语义矩阵
:param K: 隐类数量
:param steps: 学习迭代次数
:param learning_rate: 学习区
:param beta: 正则化项系数
:param min_loss: 最小损失值
:return: 学习后的P和Q
"""
Q = Q.T
loss = [get_loss(R, P, Q, K)]
for t in range(steps):
for i in range(len(R)):
for j in range(len(R[0])):
eij = R[i][j] - P[i, :] @ Q[:, j]
print(\'Pi\', P[i, :])
print(\'Qj\', Q[:, j])
print(\'eij\', eij)
for k in range(K):
P[i][k] = P[i][k] + learning_rate * (eij * Q[k][j] - beta * P[i][k])
Q[k][j] = Q[k][j] + learning_rate * (eij * P[i][k] - beta * Q[k][j])
print("第%d次" % t)
loss.append(get_loss(R, P, Q, K, beta))
print(\'loss:\', loss)
if loss[-1] <= min_loss:
break
print(abs(loss[-1] - loss[-2]), min_interval, abs(loss[-1] - loss[-2]) <= min_interval)
if abs(loss[-1] - loss[-2]) <= min_interval:
break
return P, Q.T, loss
def get_loss(R: np.array, P: np.array, Q: np.array, K: int, beta=0.02):
"""
求损失值
:param R: 用户-歌曲矩阵
:param P: 用户隐语义矩阵
:param Q: 歌曲隐语义矩阵
:param K: 隐类数量
:param beta: 正则化项系数
:return:
"""
R_new = P @ Q
loss = beta / 2 * (P ** 2).sum() + beta / 2 * (Q ** 2).sum()
for i in range(len(R)):
for j in range(len(R[0])):
if R[i][j] != 0:
loss += (R[i][j] - R_new[i][j]) ** 2
return loss
def read_data(path: str):
"""
读取csv文件
:param path: 文件路径
:return: 读取到的csv文件的DataFrame
"""
return pd.read_csv(path)
def get_R(data: pd.DataFrame):
"""
对数据进行处理并转换为用户-音乐矩阵R
:param data:
:return: 用户-音乐矩阵R(DataFrame)
"""
# 以歌曲播放量占用户总播放量的比例为评分标准
all_count = data.groupby(\'user\')[\'play_count\'].sum().reset_index()
all_count.columns = [\'user\', \'all_count\']
new_data = pd.merge(data, all_count, on=\'user\')
new_data.loc[:, \'score\'] = new_data.loc[:, \'play_count\'] / new_data.loc[:, \'all_count\']
# 以播放最多的歌曲为1分,其余的歌曲的分数为播放次数占播放最多的歌曲的次数的比例
# max_count_data = data.groupby(\'user\')[\'play_count\'].max().reset_index()
# max_count_data.columns = [\'user\', \'max_count\']
# new_data = data.merge(max_count_data, on=\'user\')
# new_data.loc[:, \'score\'] = new_data.loc[:, \'play_count\'] / new_data.loc[:, \'max_count\']
R = new_data.pivot_table(index=\'user\', columns=\'song\', values=\'score\')
R = R.fillna(0)
return R
def init_P_Q(R: np.array, K: int):
"""
初始化LFM的两个矩阵P和Q
:param R:
:param K:
:return:
"""
P = np.random.rand(len(R), K)
Q = np.random.rand(len(R[0]), K)
return P, Q
def plot_loss(loss, learning_rate=0.01):
"""
绘制损失函数曲线
:param loss:
:param learning_rate:
:return:
"""
plt.plot([i * learning_rate for i in range(len(loss))], loss)
plt.title(\'loss learning curve\')
plt.xlabel(\'step\')
plt.ylabel(\'loss\')
plt.show()
def recommended_song(R, R_hat, user):
"""
根据推荐结果进行推荐
:param R: 原用户-歌曲矩阵
:param R_hat: 预测后的用户歌曲矩阵
:param user: 需要推荐的用户
:return:
"""
recommended = R_hat.loc[user, :]
recommended.sort_values(ascending=False, inplace=True)
recommended_list = recommended.index.to_list()
recommended_score = list(recommended.values)
songs = pd.read_csv(\'./data/song.csv\', index_col=0)
print(\'推荐评分前50个(包括以前听过的):\')
for i in range(50):
song = songs[songs.loc[:, \'song_id\']==recommended_list[i]]
print(\'歌曲: %s, 歌手: %s, 专辑: %s, 评分为: %lf\' % (song["title"].values[0], song[\'artist_name\'].values[0], song[\'release\'].values[0], recommended_score[i]))
new_list = []
for i in range(len(recommended_list)):
if R.loc[user, recommended_list[i]] == 0:
new_list.append((recommended_list[i], recommended_score[i]))
print("==============================================")
print(\'推荐评分前50个(不包括以前听过的):\')
for i in range(50):
song = songs[songs.loc[:, \'song_id\'] == new_list[i][0]]
print(\'歌曲: %s, 歌手: %s, 专辑: %s, 评分为: %lf\' % (song["title"].values[0], song[\'artist_name\'].values[0], song[\'release\'].values[0], new_list[i][3]))
if __name__ == \'__main__\':
data = read_data(\'data/data.csv\')
R = get_R(data)
K = 50
learning_rate = 0.01
beta = 0
steps = 10
P, Q = init_P_Q(np.array(R), K)
print(get_loss(np.array(R), P, Q.T, K))
P, Q, loss = LFM(np.array(R), P, Q, K, learning_rate=learning_rate, steps=steps, beta=beta)
print(loss)
plot_loss(loss, learning_rate=learning_rate)
R_hat = P @ Q.T
R_hat = pd.DataFrame(R_hat, index=R.index, columns=R.columns)
s = input("请输入需要推荐的用户的用户名:")
recommended_song(R, R_hat, s)
博客地址: https://xiaoxiablogs.top