提交 0699c356 创建 作者: 王晓庆's avatar 王晓庆

1

上级 288fbdfe
#!/bin/env python
# -*- coding: utf-8 -*-
import difflib
import sys
import argparse
import webbrowser
import os
import filecmp
import subprocess
from colorama import Fore, init, AnsiToWin32
init(wrap=False)
stream = AnsiToWin32(sys.stderr).stream
compResultFile = "comp_result.html"
notebookFile = "notebook.ipynb"
# 读取文件
def read_file(file_name):
try:
file_desc = open(file_name, 'r', encoding='utf-8')
# 读取后按行分割
text = file_desc.read().splitlines()
file_desc.close()
return text
except IOError as error:
print('Read input file Error: {0}'.format(error))
sys.exit()
# 比较时忽略行末的空格和文件末尾的回车换行
def advanced_file_compare(file1, file2):
f1 = open(file1, encoding='utf-8')
f2 = open(file2, encoding='utf-8')
returnVal = 1
str1 = []
for line1 in f1:
line1 = line1.rstrip()
line1 = line1.replace('\n', '')
if len(line1) != 0:
str1.append(line1)
str2 = []
for line2 in f2:
line2 = line2.rstrip()
line2 = line2.replace('\n', '')
if len(line2) != 0:
str2.append(line2)
count = 0
if len(str1) == len(str2):
for line1 in str1:
if line1 != str2[count]: #文件不同
returnVal = 0
break
else:
count = count + 1
else:
returnVal = 0
f1.close()
f2.close()
return returnVal
# 比较两个文件并把结果生成一份html文本
def compare_file(file1, file2, seqNum):
returnVal = 0
if file1 == "" or file2 == "":
print('文件路径不能为空:第一个文件的路径:{0}, 第二个文件的路径:{1} .'.format(file1, file2))
sys.exit()
else:
print("正在将标准答案输出文件 {0} 与用户编写的应用程序输出文件 {1} 进行比较。\n比较结果".format(file1, file2), end=': ')
# if os.path.isfile(file1) and os.path.isfile(file2) and filecmp.cmp(file1, file2):
if os.path.isfile(file1) and os.path.isfile(file2) and advanced_file_compare(file1, file2):
print("文件相同")
print(Fore.GREEN + "Case{0} 验证成功".format(seqNum), file = stream, end='')
print(Fore.WHITE, file = stream)
returnVal = 1
return returnVal
else:
print("文件不同")
print(Fore.RED + "Case{0} 验证失败".format(seqNum), file = stream, end='')
print(Fore.WHITE, file = stream)
text1_lines = read_file(file1)
text2_lines = read_file(file2)
diff = difflib.HtmlDiff() # 创建HtmlDiff 对象
result = diff.make_file(text1_lines, text2_lines) # 通过make_file 方法输出 html 格式的对比结果
# 将结果写入到比较结果文件中
try:
with open(compResultFile, 'a+', encoding="utf-8") as result_file:
promptContent = "<p>Case {0} 验证失败。</br>标准答案输出文件 output{0}.txt(左边)与用户编写的应用程序输出文件 user_output{0}.txt(右边)的比较结果:</p>".format(seqNum)
result = promptContent + result
result_file.write(result)
except IOError as error:
print('写入 html 文件错误:{0}'.format(error))
finally:
return returnVal
if __name__ == "__main__":
print()
print(Fore.YELLOW + "提示:\n1. 启用验证前一定要先保存 sql 文件。\n2. 如果验证程序长时间未结束,说明源代码中可能存在死循环。请停止验证程序,修改源代码后再验证。", file = stream)
print(Fore.WHITE, file = stream)
# 移除比较结果文件
if os.path.isfile(compResultFile):
os.remove(compResultFile)
seqNum = 1
while 1:
outputFile = "output{0}.txt".format(seqNum)
useroutputFile = "user_output{0}.txt".format(seqNum)
if seqNum == 1 and not os.path.isfile(outputFile):
print(Fore.RED + "该项目未提供自动化验证功能", file = stream)
exit(1)
if not os.path.isfile(outputFile):
break
print("\n正在验证 case{0}。\n正在将 python 程序的输出内容保存到文件 {1} 中。".format(seqNum, useroutputFile))
# 将对应的case号当参数传入
runCommand = "python testcases.py {0}".format(seqNum)
print(Fore.RED + runCommand, file = stream)
execResult = os.system(runCommand)
if execResult != 0:
print(Fore.RED + "应用程序异常,返回值:{0}".format(execResult), file = stream)
exit(1)
if os.path.isfile(outputFile) and os.path.isfile(useroutputFile):
if compare_file(outputFile, useroutputFile, seqNum) == 0:
print("正在使用浏览器打开文件比较结果,可帮助用户查找验证失败的原因。")
# 使用浏览器打开比较结果页面
webbrowser.open('file://' + os.path.realpath(compResultFile))
break
seqNum = seqNum + 1
USE TPCH
select TABLE_NAME,COLUMN_NAME,ORDINAL_POSITION from Information_schema.KEY_COLUMN_USAGE where table_name = 'supplier'
USE TPCH
select TABLE_NAME,COLUMN_NAME,ORDINAL_POSITION from Information_schema.KEY_COLUMN_USAGE where table_name = 'partsupp'
# 测试python访问数据库
import pymssql
# 取得连接时会稍微等待一小段时间
from time import sleep
# 输出结果时用到,将检索的结果保存成csv
import pandas as pd
import sys
# 取得sql文件名称
sql_file = "query" + sys.argv[1] + ".sql"
user_output_txt_file = "user_output" + sys.argv[1] + ".txt"
user_output_csv_file = "user_output" + sys.argv[1] + ".csv"
# 可能连接数据库时需要消耗一点时间,这里用循环的方式
def get_conn():
# 循环3次,连接数据库
for i in range(0, 2):
try:
# 调用远程driver,获取成功即返回
if i==0:
# 等待1秒后重新获取
sleep(1)
elif i == 1:
# 等待5秒后重新获取
sleep(5)
else:
# 等待10秒后重新获取
sleep(10)
# 根据平台不同采用不同的连接方式
if sys.platform == 'linux':
# linux操作系统,用户名密码登录
conn = pymssql.connect(host='localhost', user='SA', password='<MyStrong@Passw0rd>')
else:
# windows操作系统,连接端口号
conn = pymssql.connect(host='localhost', server='localhost\SQLEXPRESS', port='55895', database='master')
return conn
except Exception:
print ("正在连接数据库,请稍候...")
# 没有启动远程driver,程序终止
print ("连接数据库失败,请重新操作!")
exit(1)
# 将sql文件的内容转换成字符串
def parse_sql(filename):
# 读取 sql 文件文本内容,sqltxt 为 list 类型
sql = open(filename, 'r', encoding = 'utf8')
sqltxt = sql.readlines()
# 读取之后关闭文件
sql.close()
# list 转 str
sql = "".join(sqltxt)
return sql
# 主函数
if __name__ == '__main__':
# 调用get_conn来取得数据库连接并获取cursor
conn = get_conn()
cursor = conn.cursor()
# 创建测试数据库
createdb_sql = parse_sql('createdb.sql')
conn.autocommit(True)
cursor.execute(createdb_sql)
conn.autocommit(False)
# 创建数据库表
createtable_sql = parse_sql('createtable.sql')
cursor.execute(createtable_sql)
# conn.commit()
# 运行学生代码
query_sql = parse_sql(sql_file)
df = pd.read_sql(query_sql, conn)
# 保存成csv格式,带表头,带每行前的序号,注意编码方式
# csv方便用户查看
df.to_csv(user_output_csv_file, index=True, header=True, encoding='gb18030')
# txt用于和答案模板比较
df.to_csv(user_output_txt_file, index=False, header=True, encoding='UTF-8')
# 连接用完后记得关闭以释放资源
conn.close()
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论