软件目录规范下的AMT+购物车(简易版)的实现
项目名:ATM+购物车(简易版)
项目需求:
1.额度15000或自定义 --> 注册功能
2.实现购物商城,买东西加入购物车,调用信用卡接口结账 --> 购物功能、支付功能
3.可以提现,手续费5% --> 提现功能
4.支持多账户登录 --> 登录功能
5.支持账户间转账 --> 转账功能
6.记录日常消费 --> 记录流水功能
7.提供还款接口 --> 还款功能
8.ATM记录操作日志 --> 记录日志功能
9.提供管理接口,包括添加账户、用户额度,冻结账户等。。。 ---> 管理员功能
10.用户认证用装饰器 --> 登录认证装饰器
所需实现的功能目录
1、注册功能
2、登录功能
3、查看余额
4、提现功能
5、还款功能
6、转账功能
7、查看流水
8、购物功能
9、查看购物车
10、管理员功能
程序的架构设计
项目实现
用户操作的用户视图层
core/src.py
\'\'\'
视图层
\'\'\'
from interface import user_interface,bank_interface,shop_interface
from lib import common
#记录用户登录状态
Login_User = None
# 1、注册功能
def register():
while True:
print("============注册功能============")
username = input("请输入用户名:").strip()
password = input("请输入密码:").strip()
re_pwd = input("请再次输入密码:").strip()
if re_pwd == password:
flag,msg = user_interface.register_interface(username,password)
#根据flag判断用户是否注册成功
if flag:
print(msg)
break
else:
print(msg)
else:
print("两次密码输入不一致请重新输入")
# 2、登录功能
def login():
while True:
print("============登录功能============")
username = input("请输入用户名:").strip()
password = input("请输入密码:").strip()
flag,msg = user_interface.user_interface(username,password)
if flag:
print(msg)
global Login_User
Login_User = username
break
else:
print(msg)
@common.login_auth
# 3、查看余额
def check_balance():
print("============查看余额============")
#调用查询余额接口,获取用户余额
balance = user_interface.check_interface(Login_User)
print(f\'用户:{Login_User} 所剩余额:{balance}\')
# 4、提现功能
@common.login_auth
def withdraw():
while True:
print("============提现功能============")
money = input("请输入提现金额").strip()
if not money.isdigit():
print("输入非法,请重新输入")
continue
#将金额交给接口层处理
flag,msg = bank_interface.withdraw_interface(
Login_User,money
)
if flag:
print(msg)
break
else:
print(msg)
# 5、还款功能
@common.login_auth
def repay():
while True:
print("============还款功能============")
money = input("请输入还款的金额").strip()
if not money.isdigit():
print("输入非法,请重新输入")
continue
money = int(money)
if money > 0:
# 将金额交给接口层处理
msg = bank_interface.repay_interface(
Login_User, money
)
print(msg)
break
else:
print("输入的数字必须大于零")
# 6、转账功能
@common.login_auth
def transfer():
while True:
print("============转账功能============")
name = input("请输入转款的账户").strip()
money = input("请输入还款的金额").strip()
if not money.isdigit():
print("输入非法,请重新输入")
continue
money = int(money)
if money > 0:
# 将金额交给接口层处理
flag, msg = bank_interface.transfer_interface(
Login_User,name,money
)
if flag:
print(msg)
break
else:
print(msg)
else:
print("输入的数字必须大于零")
# 7、查看流水
@common.login_auth
def check_flow():
print("============查看流水============")
flow = bank_interface.check_flow_interface(
Login_User
)
if flow:
for i in flow:
print(i)
else:
print("暂无用户流水信息")
# 8、购物功能
@common.login_auth
def shopping():
# 1)商品列表
# shop_list = {
# \'0\':{\'name\':\'上海灌汤包\',\'price\':30},
# }
#[[商品名称,商品单价],[商品名称,商品单价],[商品名称,商品单价]]
shop_list = [
[\'上海灌汤包\',30],
[\'北京灌汤包\',50],
[\'河南豆沙包\',80],
[\'南京流沙包\',110],
]
# \'商品名\':[价格,数量]
# shopping_car = {}
shopping_car = shop_interface.get_shop_car(Login_User)
print("============购物功能============")
for index, shop in enumerate(shop_list):
print(f"商品编号:{index},商品名称:{shop[0]},商品单价:{shop[1]}")
while True:
choice = input("请输入商品编号(输入y结账,n退出):").strip()
#打印
# for shop in shop_list:
# print(shop)
#枚举: enumerate(可迭代对象)->(可迭代对象的索引,对应的值)
#枚举: enumerate(可迭代对象)->(0,[\'上海灌汤包\',30])
if choice == \'y\' or choice == \'Y\':
#调用支付接口
if not shopping_car:
print("购物车是空的,不能支付")
continue
flag,msg = shop_interface.shopping_interface(
Login_User,shopping_car
)
if flag:
print(msg)
break
else:
print(msg)
continue
elif choice == \'n\' or choice == \'N\':
#调用添加购物车接口
#判断购物车内是否有值
if not shopping_car:
print("退出购物功能")
break
flag,msg = shop_interface.add_shop_car_interface(
Login_User,shopping_car
)
if flag:
print(msg)
break
else:
print(msg)
continue
if not choice.isdigit():
print(\'输入编号非法,请重新输入\')
continue
if int(choice) in range(len(shop_list)):
shop_name, shop_price = shop_list[int(choice)]
#加入购物车
if shop_name in shopping_car:
shopping_car.get(shop_name)[1] += 1
else:
shopping_car[shop_name] = [shop_price,1]
else:
print("输入的商品编号不存在")
continue
print(\'当前购物车:\',shopping_car)
#清空购物车功能
# 9、查看购物车
@common.login_auth
def check_shop_car():
print("============查看购物车============")
#调用查看购物车接口
shop_car,sum,sum1 = shop_interface.check_shop_car_interface(Login_User)
for index,v in enumerate(shop_car):
print(f\'序号:{index},商品名:{v[0]},价格:{v[1]},数量:{v[2]}\')
print(f"购物车中总共{sum1}件商品,需要支付{sum}元")
# 10、管理员功能
def admin():
from core import admin
admin.admin_run()
func_dic = {
\'1\': register,
\'2\': login,
\'3\': check_balance,
\'4\': withdraw,
\'5\': repay,
\'6\': transfer,
\'7\': check_flow,
\'8\': shopping,
\'9\': check_shop_car,
\'10\': admin,
\'11\': exit
}
#项目主程序
def run():
while True:
print(
\'\'\'
=========ATM+购物车==========
1、注册功能
2、登录功能
3、查看余额
4、提现功能
5、还款功能
6、转账功能
7、查看流水
8、购物功能
9、查看购物车
10、管理员功能
11、退出程序
========== end =============
\'\'\'
)
choice = input("请输入功能编号").strip()
if choice in func_dic:
func_dic.get(choice)()
else:
print("输入有误")
admin.py
管理员界面
from core import src
from interface import admin_interface
# 添加账户
def add_user():
src.register()
# 修改额度
def change_balance():
while True:
# 输入锁定用户
change_user = input("请输入需要修改额度的用户名:").strip()
# 输入修改额度
money = input("请输入需要修改的用户额度:").strip()
if not money.isdigit():
continue
# 调用额度修改接口
flag, msg = admin_interface.change_balance_interface(
change_user,money
)
if flag:
print(msg)
break
else:
print(msg)
# 冻结账户
def locked_user():
while True:
change_user = input("请输入需要冻结的账户").strip()
flag, msg = admin_interface.locked_user_interface(change_user)
if flag:
print(msg)
break
else:
print(msg)
# 管理员功能字典
admin_func = {
\'1\': [\'添加账户\',add_user],
\'2\': [\'修改额度\',change_balance],
\'3\': [\'冻结账户\',locked_user],
\'4\': [\'退出管理员功能\',],
}
def admin_run():
while True:
print("============管理员功能============")
for x in admin_func:
print(x+\':\'+admin_func.get(x)[0])
choice = input("请输入管理员功能编号:").strip()
if not choice.isdigit():
print("输入非法,请重新输入")
if choice == \'4\':
break
if choice in admin_func:
admin_func[choice][1]()
逻辑接口层
interface/bank_interface.py,
interface/shop_interface.py,
interface/user_interface.py
interface/admin_interface.py
#user_interface.py
\'\'\'
用户接口
\'\'\'
#注册接口
from db import db_handler
from lib import common
user_logger = common.get_logger(\'user\')
def register_interface(username,password,balance = 15000):
\'\'\'
注册逻辑的核心代码
:return:
\'\'\'
#调用数据处理成中的select,返回用户字典或None
user_dic = db_handler.select(username)
password = common.get_pwd_md5(password)
if user_dic:
return False,\'用户名已存在\'
else:
user_dic = {
\'username\': username,
\'password\': password,
\'balance\': balance,
# 用于记录用户流水的列表
\'flow\': [],
# 用户车
\'shop_car\': {},
# 用户冻结状态
\'locked\': False,
}
#保存数据
db_handler.save(user_dic)
msg = f\'{username}注册成功\'
user_logger.info(msg)
return True,msg
def user_interface(username,password):
user_dic = db_handler.select(username)
if not user_dic:
return False,\'用户名不存在\'
else:
if user_dic.get(\'locked\'):
return False,\'用户已被锁定\'
password = common.get_pwd_md5(password)
if password == user_dic.get(\'password\'):
msg = f\'用户:{username}登录成功\'
user_logger.info(msg)
return True , msg
else:
user_logger.warn("密码错误")
return False,"密码错误"
def check_interface(username):
user_dic = db_handler.select(username)
return user_dic.get(\'balance\')
#bank_interface.py
\'\'\'
银行相关逻辑代码
\'\'\'
from db import db_handler
from lib import common
bank_logger = common.get_logger(\'bank\')
#提现接口(手续费)
def withdraw_interface(username,money):
user_dic = db_handler.select(username)
#校验用户的钱是否足够
money2 = int(money)*1.05
balance = int(user_dic.get(\'balance\'))
if balance >= money2:
user_dic[\'balance\'] = balance - money2
flow = f\'用户{username}提现金额:{money},手续费:{money2-float(money)}元\'
user_dic[\'flow\'].append(flow)
db_handler.save(user_dic)
return True , flow
else:
return False , \'提现失败,账户余额不足\'
#还款
def repay_interface(username,money):
user_dic = db_handler.select(username)
# balance = int(user_dic.get(\'balance\'))
user_dic[\'balance\'] += money
flow = f\'用户{username}还款金额:{money}元,可以使用的金额为{user_dic["balance"]}元\'
user_dic[\'flow\'].append(flow)
db_handler.save(user_dic)
return flow
#转账
def transfer_interface(username,name,money):
transfer_dic = db_handler.select(name)
user_dic = db_handler.select(username)
if not transfer_dic:
return False, "转账用户不存在,请重新输入"
# balance = int(user_dic.get(\'balance\'))
# t_balance = int(transfer_dic.get(\'balance\'))
if user_dic.get(\'balance\') >= money:
user_dic[\'balance\'] -= money
transfer_dic[\'balance\'] += money
login_user_flow = f\'转账成功,为账户:{name}转入{money}元\'
to_user_flow = f\'用户{username},为您账户:{name}转入{money}元\'
user_dic[\'flow\'].append(login_user_flow)
transfer_dic[\'flow\'].append(to_user_flow)
db_handler.save(user_dic)
db_handler.save(transfer_dic)
return True, f\'转账成功,为账户:{name}转入{money}元\'
else:
return False, \'转账失败,账户余额不足\'
def check_flow_interface(username):
user_dic = db_handler.select(username)
return user_dic[\'flow\']
def pay_interface(username,money):
user_dic = db_handler.select(username)
if user_dic.get(\'balance\') >= money:
user_dic[\'balance\'] -= money
flow = f\'用户消费金额:{money}元\'
#记录流水
user_dic[\'flow\'].append(flow)
db_handler.save(user_dic)
return True
else:
return False
admin_interface.py
\'\'\'
管理员接口
\'\'\'
from db import db_handler
from lib import common
admin_logger = common.get_logger(\'admin\')
def change_balance_interface(username,money):
user_dic = db_handler.select(username)
if user_dic:
user_dic[\'balance\'] = int(money)
#保存用户数据
db_handler.save(user_dic)
return True,f\'账户{username}额度修改成功\'
else:
return False,\'账户不存在\'
def locked_user_interface(username):
user_dic = db_handler.select(username)
if user_dic:
user_dic[\'locked\'] = True
db_handler.save(user_dic)
return True , f\'账户{username}冻结成功\'
else:
return False, "账户不存在"
#shop_interface.py
\'\'\'
购物商场接口
\'\'\'
from db import db_handler
from lib import common
#根据不同的接口类型,传入不同的日志对象
shop_logger = common.get_logger(\'shop\')
#商品准备结算接口
def shopping_interface(username,shopping_car):
from interface import bank_interface
user_dic = db_handler.select(username)
# if not shopping_car:
# shopping_car = user_dic[\'shop_car\']
money = 0
for _,value in shopping_car.items():
print(value)
price ,num = value
money += price*money
#逻辑判断之后调用银行接口
flag = bank_interface.pay_interface(username,money)
if flag:
user_dic[\'shop_car\'] = {}
db_handler.save(user_dic)
return True,\'支付成功,准备发货\'
else:
return False,\'支付失败,金额不足\'
#添加购物车接口
def add_shop_car_interface(username,shopping_car):
# 获取当前用户的购物车
user_dic = db_handler.select(username)
shop_car = user_dic.get(\'shop_car\')
# print(user_dic)
# print(shop_car)
#shopping_car -->{\'商品名\':[价格,数量]}
for shop_name, price_num in shopping_car.items():
#如果商品存在,则添加商品数量
if shop_name in shop_car:
user_dic[\'shop_car\'][shop_name][1] += price_num[1]
else:
user_dic[\'shop_car\'].update(
{shop_name:price_num}
)
db_handler.save(user_dic)
return True,\'添加购物车成功\'
def check_shop_car_interface(username):
user_dic = db_handler.select(username)
l = []
sum,sum1 = 0,0
for name , price_num in user_dic[\'shop_car\'].items():
sum += price_num[0]
sum1 += price_num[1]
l.append([name,price_num[0],price_num[1]])
return l,sum,sum1
def get_shop_car(username):
user_dic = db_handler.select(username)
return user_dic[\'shop_car\']
程序中的公共部分和装饰器,可以提炼到lib下的common中作为公共方法
lib/common
\'\'\'
公共方法
\'\'\'
import hashlib
from core import src
#密码加盐
def get_pwd_md5(password):
md5_obj = hashlib.md5()
md5_obj.update(password.encode(\'utf-8\'))
slat = \'一二三四五\'
md5_obj.update(slat.encode(\'utf-8\'))
return md5_obj.hexdigest()
#登录认证装饰器
def login_auth(func):
def inner(*args,**kwargs):
if src.Login_User:
res = func(*args,**kwargs)
return res
else:
print("用户未登录")
src.login()
return inner
#添加日志功能:(日志功能在接口层使用)
def get_logger(log_type): # log_type -->user
\'\'\'
:param log_type: 比如是user日志,bank日志,shop日志
:return:
\'\'\'
# 1.加载日志配置信息
logging.config.dictConfig(
setting.LOGGING_DIC
)
# 2.获取日志对象
logger = logging.getLogger()
return logger
数据处理层:对底层数据进行操作的
db/db_handler
\'\'\'
数据处理层
\'\'\'
import json
import os
from conf import setting
def select(username):
#接收接口层传过来的username用户名,拼接路径
user_path = os.path.join(
setting.USER_DATA_PATH, f\'{username}.json\'
)
# 判断用户是否存在
if os.path.exists(user_path):
#打开数据并返回接口层
with open(user_path, \'r\', encoding=\'utf-8\') as f:
user_dic = json.load(f)
return user_dic
# 默认返回NONE
def save(user_dic):
#拼接用户的数据字典
user_path = os.path.join(
setting.USER_DATA_PATH, f\'{user_dic.get("username")}.json\'
)
with open(user_path, \'w\', encoding=\'utf-8\')as f:
json.dump(user_dic, f, ensure_ascii=False)
项目的配置信息
conf/setting
\'\'\'
配置信息
\'\'\'
import os
#获取项目根目录路径
BASE_PATH = os.path.dirname(
os.path.dirname(__file__)
)
#获取项目user_data 文件路径
USER_DATA_PATH = os.path.join(
BASE_PATH,\'db\',\'user_data\'
)
LOG_DATA_PATH = os.path.join(
BASE_PATH,\'log\',\'a1.log\'
)
"""
日志配置字典LOGGING_DIC
"""
# 1、定义三种日志输出格式,日志中可能用到的格式化串如下
# %(name)s Logger的名字
# %(levelno)s 数字形式的日志级别
# %(levelname)s 文本形式的日志级别
# %(pathname)s 调用日志输出函数的模块的完整路径名,可能没有
# %(filename)s 调用日志输出函数的模块的文件名
# %(module)s 调用日志输出函数的模块名
# %(funcName)s 调用日志输出函数的函数名
# %(lineno)d 调用日志输出函数的语句所在的代码行
# %(created)f 当前时间,用UNIX标准的表示时间的浮 点数表示
# %(relativeCreated)d 输出日志信息时的,自Logger创建以 来的毫秒数
# %(asctime)s 字符串形式的当前时间。默认格式是 “2003-07-08 16:49:45,896”。逗号后面的是毫秒
# %(thread)d 线程ID。可能没有
# %(threadName)s 线程名。可能没有
# %(process)d 进程ID。可能没有
# %(message)s用户输出的消息
# 2、强调:其中的%(name)s为getlogger时指定的名字
standard_format = \'%(asctime)s - %(threadName)s:%(thread)d - 日志名字:%(name)s - %(filename)s:%(lineno)d -\' \
\'%(levelname)s - %(message)s\'
simple_format = \'[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s\'
test_format = \'%(asctime)s] %(message)s\'
# 3、日志配置字典
LOGGING_DIC = {
\'version\': 1,
\'disable_existing_loggers\': False,
# 多个日志格式
\'formatters\': {
#定制的日志格式的名字
\'standard\': {
\'format\': standard_format
},
\'simple\': {
\'format\': simple_format
},
\'test\': {
\'format\': test_format
},
},
\'filters\': {},
# handlers是日志的接收者,控制日志的输出位置,不同的handler会将日志输出到不同的位置
\'handlers\': {
#打印到终端的日志
\'console\': {
\'level\': \'DEBUG\',#日志的级别,也可以写成数字
\'class\': \'logging.StreamHandler\', # 打印到屏幕
\'formatter\': \'simple\'
},
#logging.handlers.RotatingFileHandler 轮转
\'default\': {
\'level\': \'DEBUG\',
\'class\': \'logging.handlers.RotatingFileHandler\', # 保存到文件
# \'maxBytes\': 1024*1024*5, # 日志大小 5M
\'maxBytes\': 1000,
\'backupCount\': 5,
\'filename\': LOG_DATA_PATH, # os.path.join(os.path.dirname(os.path.dirname(__file__)),\'log\',\'a2.log\')
\'encoding\': \'utf-8\',
\'formatter\': \'standard\',
},
#打印到文件的日志,收集info及以上的日志
\'other\': {
\'level\': \'DEBUG\',
\'class\': \'logging.FileHandler\', # 保存到文件
\'filename\': \'a2.log\', # os.path.join(os.path.dirname(os.path.dirname(__file__)),\'log\',\'a2.log\')
\'encoding\': \'utf-8\',
\'formatter\': \'test\',
},
},
# loggers是日志的产生者,产生不同级别的日志,产生的日志会传递给handler然后控制输出
\'loggers\': {
#logging.getLogger(__name__)拿到的logger配置
\'kkk\': {
\'handlers\': [\'console\',\'other\'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
\'level\': \'DEBUG\', # loggers(第一层日志级别关限制)--->handlers(第二层日志级别关卡限制)
\'propagate\': False, # 默认为True,向上(更高level的logger)传递,通常设置为False即可,否则会一份日志向上层层传递
},
\'终端提示\': {
\'handlers\': [\'console\',], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
\'level\': \'DEBUG\', # loggers(第一层日志级别关限制)--->handlers(第二层日志级别关卡限制)
\'propagate\': False, # 默认为True,向上(更高level的logger)传递,通常设置为False即可,否则会一份日志向上层层传递
},
#真对多种相同的输出,靠不同的日志名去区分功能的,可以填\'\'
\'\': {
\'handlers\': [\'default\', ], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
\'level\': \'DEBUG\', # loggers(第一层日志级别关限制)--->handlers(第二层日志级别关卡限制)
\'propagate\': False, # 默认为True,向上(更高level的logger)传递,通常设置为False即可,否则会一份日志向上层层传递
},
},
}