Python对mysql增删改查以及json数据的简单处理,以及做python resful的简单测试。

pyMysqlJson.py

#!/usr/bin/python3

# encoding:utf-8

import flask,json

from flask import url_for # 进行网页跳转

import os # 用于操作系统文件的依赖库

import re # 引入正则表达式对用户输入进行限制

import pymysql # 连接数据库

from pymysql import cursors # pymysql转json关键参数cursorclass=cursors.DictCursor

from datetime import datetime

from collections import namedtuple

from json import JSONEncoder

# 实例化api,把当前这个python文件当作一个服务,__name__代表当前这个python文件

api = flask.Flask(__name__)

# 初始化数据库连接

# 使用pymysql.connect方法连接本地mysql数据库

db = pymysql.connect(host='xxx.xxx.xxx.xxx', port=3306, user='root',

password='123456', database='test', charset='utf8',cursorclass=cursors.DictCursor)

# 操作数据库,获取db下的cursor对象

cursor = db.cursor()

class DateEncoder(json.JSONEncoder):

def default(self, obj):

if isinstance(obj, datetime):

return obj.strftime("%Y-%m-%d %H:%M:%S")

else:

return json.JSONEncoder.default(self, obj)

# eval()方法二次封装

def eval_str(str_data):

# eval()对特殊值处理

null = ""

true = True

false = False

return eval(str_data)

def is_numeric(s):

if s.startswith("-") or s.startswith("+") or "." in s:

return all(c in "0123456789.+-" for c in s)

else:

return all(c in "0123456789" for c in s)

# 获取复杂嵌套list,json对应的下标(key)值, 可以去到任意值

# 格式:keytag: "2.a" dict_data:[{"a": "111", "b": 222}, "bbbb", {"a": "555", "b": 222}]

def get_nestdict_value(keytag, dict_data):

if not isinstance(dict_data,(dict,list)):

# dict_data = json.loads(dict_data)

dict_data = eval_str(dict_data) # 效果同上

sname = keytag.strip()

obj = scmd = realval = ""

for i in sname.split("."):

if is_numeric(i):

obj = "%s[%s]" % (obj, i)

else:

obj = "%s['%s']" % (obj, i)

scmd = "%s%s" % ("dict_data", obj)

try:

realval = eval(scmd)

except Exception as e:

print (e.message)

return "[Failed]:cmd change error,eval(%s)" % scmd

return realval

def userDecoder(userDict):

return namedtuple('ENTITY', userDict.keys())(*userDict.values())

def addUser(jsonStr):

#from-data格式参数

userEntity = json.loads(jsonStr, object_hook=userDecoder)

try:

sql = "insert into user(user, password,step,total,steptime,type,url,jointime)values(%s,%s,%s,%s,%s,%s,%s,%s)"

cursor.execute(sql, (userEntity.user,userEntity.password,userEntity.step,userEntity.total,userEntity.steptime,userEntity.type,userEntity.url,userEntity.jointime))

insert_result = "Successfully saved User Info."

except Exception as err:

print(err)

insert_result = "Failed to insert User Info."

pass

db.commit()

return insert_result

def updateUser(jsonStr):

#from-data格式参数

userEntity = json.loads(jsonStr, object_hook=userDecoder)

try:

userEntity_update_before = findOneUser(userEntity.user)

# 判断表单转成对象之后的某个属性是否存在。若不存在时则不update mysql

if not hasattr(userEntity, 'user'):

userEntity.user = userEntity_update_before.user

if not hasattr(userEntity, 'password'):

userEntity.password = userEntity_update_before.password

if not hasattr(userEntity, 'step'):

userEntity.step = userEntity_update_before.step

if not hasattr(userEntity, 'total'):

userEntity.total = userEntity_update_before.total

if not hasattr(userEntity, 'steptime'):

userEntity.steptime = userEntity_update_before.steptime

if not hasattr(userEntity, 'type'):

userEntity.type = userEntity_update_before.type

if not hasattr(userEntity, 'url'):

userEntity.url = userEntity_update_before.url

if not hasattr(userEntity, 'jointime'):

userEntity.jointime = userEntity_update_before.jointime

sql = "update user set user=%s,password=%s,step=%s,total=%s,steptime=%s,type=%s,url=%s,jointime=%s where user=%s;"

cursor.execute(sql, (userEntity.user,userEntity.password,userEntity.step,userEntity.total,userEntity.steptime,userEntity.type,userEntity.url,userEntity.jointime,userEntity.user))

update = "Successfully updated User Info."

except Exception as err:

print(err)

update = "Failed to update User Info."

pass

db.commit()

return update

def deleteUser(query):

try:

sql_delete = "delete from user where user='" + query + "';"

cursor.execute(sql_delete)

delete_result = "Successfully deleted User %s." % query

except Exception as err:

print(err)

delete_result = "Failed to delete User Info."

pass

db.commit()

return delete_result

def findOneUser(query):

#from-data格式参数

sql_list = "select * from user where user = %s;"

cursor.execute(sql_list,query)

results = cursor.fetchone()

python_to_json = json.dumps(results,ensure_ascii=False,cls=DateEncoder) # cls=DateEncoder 将字典格式转化为字符串格式

print("python_to_json",python_to_json)

userEntity = json.loads(python_to_json, object_hook=userDecoder)

return userEntity

def findAllUser():

#from-data格式参数

sql_list = "select * from user"

cursor.execute(sql_list)

results = cursor.fetchall()

python_to_json = json.dumps(results,ensure_ascii=False,cls=DateEncoder) # cls=DateEncoder 将字典格式转化为字符串格式

# print(type(python_to_json)) #

jsonList = json.loads(python_to_json)

# print(type(jsonList)) #

return jsonList

#post入参访问方式二:josn格式参数

@api.route('/post',methods=['post'])

def testPost():

#from-data格式参数

sql_list = "select * from user"

cursor.execute(sql_list)

results = cursor.fetchall()

print(results)

print(type(results)) # of oringinal

python_to_json = json.dumps(results,ensure_ascii=False,cls=DateEncoder) # cls=DateEncoder 将字典格式转化为字符串格式

print(python_to_json)

print(type(python_to_json)) #

jsonList = json.loads(python_to_json)

print(type(jsonList)) #

print("List_Len:",len(jsonList))

print("List[0]:",jsonList[0])

count = 0

for list in jsonList:

count += 1

print("List[%d]:"%(count),list)

# Parse JSON into an object with attributes corresponding to dict keys.

userEntity = json.loads(json.dumps(list), object_hook=userDecoder)

print("After Converting JSON Data into Custom Python Object")

print(userEntity.user, userEntity.step)

print(get_nestdict_value("0.user", python_to_json))

str = findAllUser()

print("findAllUser:",str)

userJsonData = '{"user": "15***742064", "password": "xxxxxxxxx", "step": 12600, "total": 18000, "steptime": "2023-03-31 13:00:03", "type": 1, "url":"111","jointime": "2022-06-28 21:15:04"}'

addUser(userJsonData)

ret = findOneUser("15***742064")

print("findOneUser:",ret.user)

# deleteUser("15***742064")

# updateUser(userJsonData)

json_str = json.dumps(python_to_json)

print ("python原始数据:", repr(python_to_json))

print ("json对象:", json_str)

json_str2 = json.dumps(python_to_json)

print ("python原始数据:", repr(python_to_json))

print ("json对象:", json_str2)

# 将json对象转换为python字典

data3 = json.loads(json_str)

print ("python字典:", data3)

usrname = flask.request.json.get('user')

pwd = flask.request.json.get('password')

if usrname and pwd:

if usrname =='1787841***' and pwd =='******':

ren = {'msg':'登录成功','msg_code':200}

else:

ren = {'msg':'用户名或密码错误','msg_code':-1}

else:

ren = {'msg':'用户名或密码为空','msg_code':1001}

return json.dumps(ren,ensure_ascii=False)

@api.route('/get',methods=['get'])

def testGet():

ret = findOneUser("15***742064")

return json.dumps(ret,ensure_ascii=False)

if __name__ == '__main__':

# 启动服务器

#api.debug = True #改了代码后,不用重启,它会自动重启

# 增加session会话保护(任意字符串,用来对session进行加密)

api.secret_key = 'carson'

try:

api.run(port=8888,debug=True,host='127.0.0.1') # 启动服务

except Exception as err:

print(err)

db.close() # 关闭数据库连接

运行pyMysqlJson.py

/usr/bin/python3 /home/yimning/yimning/app/python/pyMysqlJson.py

结果如下:

test post

使用curl发起http post请求,如下:

curl -d '{

"user":"1787841***",

"password":"******",

"brief":"test python json mysql"

}' -H 'Content-Type: application/json' -X POST http://127.0.0.1:8888/post

结果打印如下:

test get

使用curl发起http get请求,如下:

curl http://127.0.0.1:8888/get

结果打印如下:

mysql

databaseName = testtableName = user

DROP TABLE IF EXISTS `user`;

CREATE TABLE `user` (

`user` varchar(255) NOT NULL,

`password` varchar(255) NOT NULL,

`step` int DEFAULT NULL,

`total` int DEFAULT NULL,

`steptime` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,

`type` int DEFAULT NULL,

`url` varchar(255) DEFAULT NULL,

`jointime` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,

PRIMARY KEY (`user`,`password`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

文章来源

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: