Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
测
测试ci创建视图
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
问题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
银宸大学计算机学院
教师群组
王晓庆-wangxiaoqing
数据库
测试ci创建视图
提交
ae74d756
提交
ae74d756
8月 06, 2020
创建
作者:
王晓庆
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
增加新文件
上级
隐藏空白字符变更
内嵌
并排
正在显示
1 个修改的文件
包含
149 行增加
和
0 行删除
+149
-0
testsql.py
testsql.py
+149
-0
没有找到文件。
testsql.py
0 → 100644
浏览文件 @
ae74d756
#!/bin/env python
# -*- coding: utf-8 -*-
# 忽略一些无关的警告
import
warnings
warnings
.
simplefilter
(
'ignore'
,
DeprecationWarning
)
# 测试python访问数据库
import
pymssql
# 取得连接时会稍微等待一小段时间
from
time
import
sleep
# 输出结果时用到,将检索的结果保存成csv
import
pandas
as
pd
import
sys
# 用正则表达式切割字符串
import
re
# 字体颜色
red
=
lambda
text
:
'
\033
[0;31m'
+
text
+
'
\033
[0m'
green
=
lambda
text
:
'
\033
[0;32m'
+
text
+
'
\033
[0m'
yellow
=
lambda
text
:
'
\033
[0;33m'
+
text
+
'
\033
[0m'
# 验证类型:本地--local_test,线上--ci_test
test_type
=
sys
.
argv
[
1
]
# 查询文件:query.sql,schema_check.sql
sql_file_type
=
sys
.
argv
[
2
]
# 取得sql文件名称
sql_file
=
sql_file_type
+
sys
.
argv
[
3
]
+
".sql"
if
sql_file_type
==
"query"
:
query_result_file
=
"qurey_result"
+
sys
.
argv
[
3
]
+
".txt"
user_output_txt_file
=
"user_query"
+
sys
.
argv
[
3
]
+
"_result.txt"
user_output_csv_file
=
"user_query"
+
sys
.
argv
[
3
]
+
"_result.csv"
else
:
query_result_file
=
"schema_check_result"
+
sys
.
argv
[
3
]
+
".txt"
user_output_txt_file
=
"user_schema_check"
+
sys
.
argv
[
3
]
+
"_result.txt"
user_output_csv_file
=
"user_schema_check"
+
sys
.
argv
[
3
]
+
"_result.csv"
# 可能连接数据库时需要消耗一点时间,这里用循环的方式
def
get_conn
():
# 循环3次,连接数据库
for
i
in
range
(
0
,
2
):
try
:
# 调用远程driver,获取成功即返回
if
i
==
0
:
# 等待5秒后重新获取
sleep
(
5
)
elif
i
==
1
:
# 等待10秒后重新获取
sleep
(
10
)
else
:
# 等待15秒后重新获取
sleep
(
15
)
# 根据平台不同采用不同的连接方式
if
sys
.
platform
==
'linux'
:
# linux操作系统,用户名密码登录
conn
=
pymssql
.
connect
(
host
=
'localhost'
,
user
=
'SA'
,
password
=
'<MyStrong@Passw0rd>'
)
else
:
# windows操作系统,连接端口号
conn
=
pymssql
.
connect
(
host
=
'localhost'
,
server
=
'localhost
\
SQLEXPRESS'
,
port
=
'1433'
,
database
=
'master'
)
return
conn
except
Exception
:
print
(
yellow
(
"请稍等,正在连接数据库..."
))
# 没有启动远程driver,程序终止
print
(
red
(
"连接数据库失败!"
))
exit
(
1
)
# 将sql文件的内容转换成字符串
def
parse_sql
(
filename
):
# 读取 sql 文件文本内容,sqltxt 为 list 类型
sql
=
open
(
filename
,
'r'
,
encoding
=
'utf-8'
)
sqltxt
=
sql
.
readlines
()
# 读取之后关闭文件
sql
.
close
()
# list 转 str
sql
=
""
.
join
(
sqltxt
)
return
re
.
split
(
r'[\r\n]GO|go|Go|gO[\r\n]'
,
sql
)
# 输出sql中的打印信息
def
my_msg_handler
(
msgstate
,
severity
,
srvname
,
procname
,
line
,
msgtext
):
"""
Our custom handler -- It simpy prints a string to stdout assembled from
the pieces of information sent by the server.
"""
print
(
yellow
(
"
%
s"
%
msgtext
.
decode
(
'utf-8'
)))
# 主函数
if
__name__
==
'__main__'
:
# 调用get_conn来取得数据库连接并获取cursor
conn
=
get_conn
()
conn
.
_conn
.
set_msghandler
(
my_msg_handler
)
cursor
=
conn
.
cursor
()
# 只有使用query1.sql查询时才建库、建表
if
sql_file
==
"query1.sql"
:
# 创建测试数据库
if
test_type
==
"ci_test"
:
print
(
yellow
(
"正在使用文件 createdb.sql 创建数据库"
))
else
:
print
(
"正在使用文件 createdb.sql 创建数据库"
)
createdb_sql
=
parse_sql
(
'createdb.sql'
)
conn
.
autocommit
(
True
)
for
line_query
in
createdb_sql
:
cursor
.
execute
(
line_query
)
conn
.
autocommit
(
False
)
# 创建数据库表
if
test_type
==
"ci_test"
:
print
(
yellow
(
"正在使用文件 createtable.sql 创建表结构和插入数据"
))
else
:
print
(
"正在使用文件 createtable.sql 创建表结构和插入数据"
)
createtable_sql
=
parse_sql
(
'createtable.sql'
)
for
line_query
in
createtable_sql
:
cursor
.
execute
(
line_query
)
conn
.
commit
()
# 运行学生代码
if
test_type
==
"ci_test"
:
print
(
yellow
(
"正在使用文件 {0} 进行查询操作,结果保存在文件 {1}"
.
format
(
sql_file
,
user_output_txt_file
)))
query_sql
=
parse_sql
(
sql_file
)
pd
.
set_option
(
'display.max_columns'
,
None
)
n
=
0
for
line_query
in
query_sql
:
n
=
n
+
1
if
n
==
len
(
query_sql
):
# 执行最后一条sql命令查询的时候用pd.read_sql,便于保存结果
df
=
pd
.
read_sql
(
line_query
,
conn
)
# 保存成csv格式,带表头,带每行前的序号,注意编码方式
# csv方便用户查看
df
.
to_csv
(
user_output_csv_file
,
index
=
True
,
header
=
True
,
encoding
=
'UTF-8'
)
# txt用于和答案模板比较
df
.
to_csv
(
user_output_txt_file
,
index
=
False
,
header
=
True
,
encoding
=
'UTF-8'
)
if
test_type
==
"ci_test"
:
print
(
yellow
(
"正在使用文本比较工具diff对比标准查询结果文件 {0} 和本次查询的输出结果文件 {1}"
.
format
(
query_result_file
,
user_output_txt_file
)))
else
:
# 执行sql命令
cursor
.
execute
(
line_query
)
# 连接用完后记得关闭以释放资源
conn
.
close
()
编写
预览
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论