API接口进行PDF解析

​ 对不同地区的文件信息进行解析,传入文件路径和地区名称,由于这是内部api接口,可以进行替换。

1
2
3
4
5
area = "重庆"
paths = r'文件路径'
#api接口
api_url = "http://xxxxxx"
result = upload_pdf_files(api_url, paths, area)

判断文件路径

​ 对文件路径进行判断,该路径是否存在,写成check_path函数,返回Boolean类型。

1
2
3
4
5
6
7
8
9
10
11
12
def check_path(path):
"""
检查路径是否存在
参数:
path: str,待检查路径
返回值:
True,如果路径合法;False,否则
"""
if not os.path.exists(path):
print(f"路径 {path} 不存在,请检查后重试")
return False
return True

API返回信息进行处理

​ 利用函数进行封装,对文件进操作处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def upload_pdf_files(api_url, paths, area):
"""
批量上传指定路径下的所有 PDF 文件到服务器
参数:
api_url: str,PDF 文件上传接口地址
paths: str,待上传文件夹路径
area: str,城市名称,用于拼接上传接口地址
返回值:
一个包含元组 (data, order_id) 的列表,其中 data 为成功上传单个文件后接口返回的数据,
order_id 为该文件所在文件夹的名称。
"""
global order_id
if not check_path(paths):
return

area_dict = {"重庆": "chongqing", "四川": "sichuan", "深圳": "shenzhen_split"}
city = area
#city = paths.split('\\')[2] # 使用反斜杠来分隔路径,这里需要加反斜杠转义符号
api_url = f"{api_url}/pdf_analysis_{area_dict[area]}"

result_list = []
for root, dirs, files in os.walk(paths):
for dir_name in dirs:
order_id = os.path.join(dir_name)
for file_name in files:
#判断文件后缀是否为pdf文件
if file_name.endswith('.pdf'):
file_path = os.path.join(root, file_name)
# 通过api接口获取信息
result = upload_pdf_file(api_url, file_path)
if result:
result_list.append([order_id, result])

​ 对返回的数据进行处理,将数据插入对应的表中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
table_mapping = {"重庆": "", "四川": "_sichuan", "深圳": "_shenzhen"}
# 通过字典,从返回的信息中提取出返回字典key,从该字典中拿到value,在后面进行补全表名
table_mapping2 = {
"result_tax_finance_balance": "finance_balance_info",
"result_tax_finance_profit": "finance_profit_info",
"result_cash_flow": "finance_cash_flow_info",
"result_tax_value_added": "declare_vat_info",
"result_corporate_income_season": "declare_income_season_info",
"result_corporate_income_year": "declare_income_year_info"
}
table_prefix = table_mapping[area]
# 保存没有 table_name 的文件名到列表中
no_table_names = []
table_names = []
dataWriter = DataWriter(area)
for item in result_list:
# 打印文件名 和 post请求的信息
# print(item[1])
for k in item[1][1].keys():
if item[1][1][k]['code'] == '200':
table_name = "csp" + table_prefix + "_etp_" + table_mapping2[k]
print(table_name)
table_names.append(item[1][0])
dataWriter.write(table_name, item[1][0], item[0], item[1][1][k]['data'])
else:
no_table_names.append(item[1][0])

​ 将item[1][1][k]['code']不为200的进行筛选处理,先进行去重操作,原因是由于重复的写入之后追加到error.csv文件中

1
2
3
4
5
6
7
8
9
10
11
12
no_table_names = set(no_table_names)
# 滤出 no_table_names中不在 table_names中的元素
filtered_list = list(filter(lambda x: x not in table_names, no_table_names))
# print(filtered_list)
# 将未命名的文件名写入 error.csv 文件中
with open('./error.csv', 'a+', encoding='utf-8') as f:
for file_name in filtered_list:
f.write(city + ",")
f.write(file_name)
f.write('\n')
f.seek(0)
return result_list

API接口

​ 传递api_url和文件,返回json数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def upload_pdf_file(api_url, file_path):
"""
上传 PDF 文件到服务器
参数:
api_url: str,PDF 文件上传接口地址
file_path: str,待上传 PDF 文件路径
返回值:
成功时返回 True,失败时返回 False
"""
if not os.path.exists(file_path):
print(f"文件 {file_path} 不存在,请检查后重试")
return False

city = file_path.split('\\')[2] # 获取城市名称
path = Path(file_path)
# 正则表达式判断文件名称,传入的参数不同
types = '税务' if re.search("税", path.stem) else '财务'
files = {
"pdf_name": (None, path.stem),
"report_type": (None, types),
# psd字符流
'pdf_stream': (path.name, open('%s' % file_path, 'rb'), 'application/pdf')
}
result = []

try:
response = requests.post(url=api_url, files=files, timeout=3)
time.sleep(1)
if response.status_code == 200:
data = response.json()
result.append(path.name)
result.append(data)
return result
else:
# 将请求失败的文件名称和地区写入error.csv文件
print(f"请求失败:{response.text}")
with open('./error.csv', 'a+', encoding='utf-8') as f:
f.write(city + ",")
f.write(path.name)
f.write('\n')
f.seek(0)
return False
except requests.exceptions.RequestException as e:
print(e)
return False

pymysql使用

​ 通过连接数据库进行数据写入

1
2
3
4
5
6
7
with pymysql.connect(
host='localhost',
port=3306,
user='root',
password='fy0825...',
db='csp_syx_pdf'
) as conn:

​ 通过老师编写的一个 DataWriter类,对数据库进行连接与写入,进行了简单的修改,使用 with 语句来自动管理数据库连接的打开和关闭。在 write 方法中,类根据传入的参数对数据进行了清理,并将其格式化为 MySQL 可以识别的形式,然后执行插入操作,最后提交事务。如果出现任何异常,则回滚事务并打印错误消息。

收获

​ 利用API对pdf文件进行解析,得到 json 格式的数据,其中利用了 os 库对文件进行读取,

​ 用os.walk生成指定树中每个目录的值元组,包括目录路径、子目录列表和文件列表。

root是当前目录的路径。

dirs是当前目录下的子目录列表。

files是当前目录下的文件列表

1
2
3
for root, dirs, files in os.walk(paths):
for dir_name in dirs:
order_id = os.path.join(dir_name)

pathlib模块中的Path类,

Path 对象是 Python3 中用于处理文件和目录路径的类

1
2
3
4
5
6
7
8
#创建名称为path的path类
path = Path(file_path)
#获取文件名称
path.name
# 获取文件所在路径
path.parent
# 获取文件后缀
path.suffix

​ 对于文件的处理更加的灵活,能够对多级目录文件进行读取,利用pymysql对于数据数据库的连接与数据的写入,通过对数据进行判断进行合理的插入。只有写多练对于程序的编写才会更加的熟练。

​ 简单的优化,对将所有税区的文件全部进写入,判断属于哪一个税区,再利用API接口进行解析。

1
2
3
4
5
6
7
8
9
10
11
12
# 定义目标路径
path = r"D:\税区"
# 获取目标路径下所有一级子文件夹的名称
dirs = [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f))]
# 获取其下所有二级子文件夹的名称
two_dirs = [os.path.join(path, f, subf) for f in dirs for subf in os.listdir(os.path.join(path, f)) if
os.path.isdir(os.path.join(path, f, subf))]
for folder in two_dirs:
print(folder)
area = folder.split('\\')[2]
print(area)
result = upload_pdf_files(api_url, folder, area)