Access 作为一款文件型数据库,与 SQLite 类似,可以作为简单应用解决方案的数据存储。根据 DB-Engines Ranking 的数据库排名数据,MS Access 在 2020年12月的排名为 11 位,不算太小众。 本篇讲解在 Python 语言环境中,如何操作 Access 数据库,代码中将用到 pyodbc, adodbapi 和 pywin32 三个第三方包,这三个包都用 pip 方式安装。
pyodbc
Python 没有专门针对 Access 数据库的驱动,但微软提供了 ODBC 方式,可以访问 Access ,python 有支持 odbc 访问数据库的第三方模块 pyodbc,所以可以通过 pyodbc 作为桥接的工具。
MS Office 有 32 位版本和 64 位版,如果操作系统版本与 Office 版本不一致,使用 pyodbc 的时候可能会遇到错误。假设操作系统是 Windows 64 位,但Office 版本是 32 位的,此时安装 ODBC for MS Access,应该是 32 位的,将出现 pyodbc 找不到 32 位驱动的情况,如果尝试安装 64 位 odbc 驱动,Windows 将提示已经安装 32 位驱动,不允许再安装 64 位驱动。
解决办法是使用微软提供的 Orca 工具(本文的后面的源代码提供了 Orca 工具),修改 64 位 odbc 安装文件的数据表,删除里面的 blockinstallation 限制。先通过下面的网址下载 64 位驱动 (https://www.microsoft.com/en-us/download/details.aspx?id=13255),将下载的 AccessDatabaseEngine_X64.exe 文件解压缩,用 Orca 工具打开里面的文件 AceRedist.msi,找到 launchcondition:
然后再运行 AceRedist.msi,就可以成功安装 64 位的 ODBC 驱动。
pyodbc 符合 python DB-API 2.0 规范。下面是 pyodbc 访问 MS Access 数据库的示例,包括 CRUD 操作,以及有参数 的 sql 语句操作方法。本篇的目的并不是详细讲解 pyodbc 的用法,只演示一个大概的模式。
import pyodbc
import unittest
conn = pyodbc.connect(DSN="msaccess_employees")
cursor = conn.cursor()
class TestPyOdbc(unittest.TestCase):
def test_select(self):
cursor.execute("select * from employees")
result = cursor.fetchall() # result为list类型
for item in result:
print(item) # item为pyodbc.Row类型
def test_insert(self):
sql = """
INSERT INTO employees ( EMP_ID, FIRST_NAME, LAST_NAME, GENDER,
AGE, EMAIL, PHONE_NR, EDUCATION, MARITAL_STAT,NR_OF_CHILDREN )
VALUES ('9001', 'Stone', 'Wang', 'Male', 18, 'stone@126.com', '138xxx', 'Bachelor', 'Married', 2 );
"""
cursor.execute(sql)
conn.commit()
def test_update(self):
sql = "update employees set AGE=20 where EMP_ID=9001"
cursor.execute(sql)
conn.commit()
def test_delete(self):
cursor.execute("delete from employees where EMP_ID=9001")
def test_sql_with_parameter(self):
sql = "select * from employees where EMP_ID=?"
cursor.execute(sql, ['1001'])
print(cursor.fetchone())
if __name__ == "__main__":
unittest.main()
提一下连接字符串 (connection string),如果 Windows 版本与 Office 版本一致,连接字符串可以这样写:
conn = pyodbc.connect("Driver={{Microsoft Access Driver (*.mdb, *.accdb)}};DBQ=<ms_access_db_path>")
版本不一致则只能通过 DSN 来访问,打开 ODBC 数据源 (odbcad32.exe),创建一个新的数据源,选择 MS Access 驱动:
完成数据源的配置界面如下:
adodbapi
adodbapi 库对 ADO 进行了封装,符合 DB-API 2.0 规范。项目在 pypi 的地址:https:///project/adodbapi/,源代码托管在 SourceForge 上面。对 adodbapi 网络上没有太多文档,大家可以参考的主要是该项目提供的 quick reference,本文的源代码中附了 quick reference 文档。
以下是 adodbapi 的使用示例:
import adodbapi
import os
import unittest
def get_current_dir():
"""
获取当前文件夹
"""
return os.path.dirname(os.path.abspath(__file__))
db_file_path = get_current_dir() + r'\db\Employees.accdb'
conn_str = "Provider=Microsoft.ACE.OLEDB.12.0;Data Source=%s;" % db_file_path
conn = adodbapi.connect(conn_str)
cursor = conn.cursor()
class TestAdodbapi(unittest.TestCase):
def test_select(self):
cursor.execute("select * from employees;")
result = cursor.fetchall() # result为adodbpai.apibase.SQLRows类型
for item in result:
print(item) # itemadodbpai.apibase.SQLRow类型
def test_insert(self):
sql = """
INSERT INTO employees ( EMP_ID, FIRST_NAME, LAST_NAME,
GENDER, AGE, EMAIL, PHONE_NR, EDUCATION, MARITAL_STAT, NR_OF_CHILDREN)
VALUES ('9001', 'Stone', 'Wang', 'Male', 18, 'stone@126.com', '138xxx', 'Bachelor', 'Married', 2);
"""
cursor.execute(sql)
conn.commit()
def test_update(self):
sql = "update employees set AGE=20 where EMP_ID=9001"
cursor.execute(sql)
conn.commit()
def test_delete(self):
cursor.execute("delete from employees where EMP_ID=9001")
def test_sql_with_paramter(self):
sql = "select * from employees where EMP_ID=?"
cursor.execute(sql, (1001,))
print(cursor.fetchone())
if __name__ == "__main__":
unittest.main()
原生 ADO 组件
ADO 是微软提供的 数据访问技术,我们也可以通过 pywin32 模块操作 ADO 组件,实现对 MS Access 数据库的访问。与 DB-API 2.0 规范相比,ADO 访问数据库的代码相对繁琐,但如果已经比较熟悉 ADO 编程模型,编写数据访问的代码也比较简单,而且我们可以利用 Python 面向对象的方法,对原生 ADO 按自己的需要进行封装,从而简化代码。
ADO 对象模型中,核心的是 Connection, Command 和 RecordSet 三个。Connection 代表与数据库的连接,Command 对象用于执行 SQL 语句,比如插入数据,修改数据等。RecordSet 对象代表从数据库获取的数据,可以用遍历的方式查看数据。RecordSet 对象本身也可以进行数据的插入、修改和删除等操作。
封装 Connection 对象
from win32com.client import Dispatch
class ConnectionWrapper(object):
def __init__(self, conn_str):
self.connection_string = conn_str
def get_connection(self):
conn = Dispatch("ADODB.Connection")
conn.ConnectionString = self.connection_string
return conn
封装 Command 对象
from win32com.client import Dispatch
from . import error_handler
from . import adoconstants
class CommandWrapper(object):
@staticmethod
def execute(conn, sql):
cmd = Dispatch("ADODB.Command")
try:
conn.Open()
cmd.ActiveConnection = conn
cmd.CommandText = sql
cmd.execute()
except Exception as ex:
print(ex)
for err in conn.Errors:
error_handler.print_error(err)
finally:
if conn.State == adoconstants.adStateOpen:
conn.Close()
封装 RecordSet 对象
RecordSetWrapper 对象主要通过 query() 方法获取数据,参数可以是 sql 语句或者 table name。rst_to_list() 方法用于以 list 格式输出结果集,to_excel() 方法用于将 RecordSet 对象导出到 Excel 的工作表。
from win32com.client import Dispatch
from ADOWrapper.adoconstants import *
class RecordSetWrapper(object):
def __init__(self):
pass
@staticmethod
def rst_to_list(recordset):
"""
Convert recordset to list
"""
result = []
if not (recordset.BOF and recordset.EOF):
# header line
header = []
for idx in range(recordset.Fields.Count):
header.append(recordset.Fields(idx).Name)
result.append(header)
# line items
# Python对于数据库的NULL值自动转换成None
recordset.MoveFirst()
while not recordset.EOF:
item = []
for idx in range(recordset.Fields.Count):
item.append(str(recordset.Fields(idx)))
result.append(item)
recordset.MoveNext()
return result
@staticmethod
def query(conn, sql):
rst = Dispatch("ADODB.Recordset")
result = []
try:
if conn.state != adStateOpen:
conn.Open()
rst.Open(sql, conn, adOpenKeyset, adLockReadOnly)
result = RecordSetWrapper.rst_to_list(rst)
except Exception as ex:
print(ex)
for err in conn.Errors:
print(err.Description)
finally:
rst.Close()
conn.Close()
return result
@staticmethod
def get_recordset(conn, sql):
rst = Dispatch("ADODB.Recordset")
conn.Open()
rst.Open(sql, conn, adOpenKeyset, adLockReadOnly)
return rst
@staticmethod
def to_excel(recordset, excel_file, replace = False):
# create excel file
excel_app = Dispatch("Excel.Application")
excel_app.Visible = True
try:
work_book = excel_app.Workbooks.Add()
target_sheet = work_book.ActiveSheet
# copy recordset header
for idx in range(0, recordset.Fields.Count):
target_sheet.cells(1, idx+1).Value = recordset.Fields(idx).Name
# copy recordset lines
target_sheet.Range("A2").CopyFromRecordSet(recordset)
print("导出成功!")
finally:
recordset.Close()
以下是单元测试代码:
from ADOWrapper.ado_command import CommandWrapper
from ADOWrapper.ado_connection import ConnectionWrapper
from ADOWrapper.ado_recordset import RecordSetWrapper
from msaccess_db_file_path import get_access_db_file
import unittest
# 连接MS Access数据库
conn_str = "Provider=Microsoft.ACE.OLEDB.12.0;Data Source=%s;" % get_access_db_file()
conn = ConnectionWrapper(conn_str).get_connection()
class TestAdoWrapper(unittest.TestCase):
def test_insert(self):
sql = """
INSERT INTO employees
( EMP_ID, FIRST_NAME, LAST_NAME, GENDER, AGE, EMAIL, PHONE_NR,
EDUCATION, MARITAL_STAT, NR_OF_CHILDREN)
VALUES ('9001', 'Stone', 'Wang', 'Male', 18, 'stone@126.com', '138xxx', 'Bachelor', 'Married', 2 );
"""
CommandWrapper.execute(conn, sql)
def test_update(self):
sql = "UPDATE employees SET AGE=20 WHERE EMP_ID=9001"
CommandWrapper.execute(conn, sql)
def test_delete(self):
CommandWrapper.execute(conn, "DELETE FROM employees WHERE EMP_ID=9001")
def test_query(self):
result = RecordSetWrapper.query(conn, "SELECT * FROM employees")
for record in result:
print(record)
def test_query_table(self):
result = RecordSetWrapper.query(conn, "employees")
for record in result:
print(record)
def test_export_to_excel(self):
rst = RecordSetWrapper.get_recordset(conn, "select * from employees where EMP_ID<1020;")
RecordSetWrapper.to_excel(rst, "D:/employee_output.xlsx")
if __name__ == "__main__":
unittest.main()
原生 RecordSet 对象
原生RecordSet 对象支持数据库的 CRUD 操作,以下代码演示了 Python 使用原生 RecordSet 的方法:
"""
原生的|RecordSet具备CRUD功能,本示例说明其用法
"""
from ADOWrapper.ado_connection import ConnectionWrapper
from ADOWrapper.ado_recordset import RecordSetWrapper
from ADOWrapper.adoconstants import *
from win32com.client import Dispatch
from msaccess_db_file_path import get_access_db_file
import unittest
# 连接MS Access数据库
conn_str = "Provider=Microsoft.ACE.OLEDB.12.0;Data Source=%s;" % get_access_db_file()
conn = ConnectionWrapper(conn_str).get_connection()
class TestRecordSet(unittest.TestCase):
def test_list_employees(self):
rst = RecordSetWrapper.get_recordset(conn, "select * from employees")
if rst.BOF and rst.EOF:
return
rst.MoveFirst()
while not rst.EOF:
for idx in range(0, rst.Fields.Count):
print(rst.Fields(idx), end=",")
print()
rst.MoveNext()
rst.Close()
conn.Close()
def test_create_employee(self):
rst = Dispatch("ADODB.Recordset")
conn.Open()
rst.Open('employees', conn, adOpenKeyset, adLockOptimistic)
try:
rst.AddNew()
rst.Fields("EMP_ID").Value = '9002'
rst.Fields("FIRST_NAME").Value = 'Stone'
rst.Fields("LAST_NAME").Value = "Wang"
rst.Update()
print('新增记录成功!')
except Exception as ex:
print(ex)
for err in conn.Errors:
print(err)
finally:
rst.Close()
conn.Close()
def test_modify_employee(self):
rst = Dispatch("ADODB.Recordset")
try:
conn.Open()
sql = "SELECT * FROM employees WHERE EMP_ID=9002"
rst.Open(sql, conn, adOpenKeyset, adLockOptimistic)
if not rst.EOF:
rst.Fields('AGE').Value = 18
rst.Update()
print('修改成功!')
except Exception as ex:
print(ex)
for err in conn.Errors:
print(err)
finally:
rst.Close()
conn.Close()
def test_delete_employee(self):
rst = Dispatch("ADODB.Recordset")
conn.Open()
sql = "SELECT * FROM employees WHERE EMP_ID=9002"
try:
# IMPORTANT: client cursor should be used for deletion
rst.CursorLocation = adUseClient
rst.Open(sql, conn, adOpenKeyset, adLockOptimistic)
if not rst.EOF:
rst.Delete(1) # deleter first row
print('删除成功!')
else:
print('没有找到记录!')
except Exception as ex:
print(ex)
for err in conn.Errors:
print(err)
finally:
rst.Close()
conn.Close()
if __name__ == '__main__':
unittest.main()
sqlalchemy-access
sqlalchemy 并不直接支持 MS Access 数据库,但可以通过安装 sqlalchemy-access 模块来提供支持。sqlalchemy-access 在 pypi 的地址:https:///project/sqlalchemy-access/。安装的方法:
pip install sqlalchemy-access
如果小伙伴对 sqlalchemy 的使用感兴趣,请自行寻找资源学习。本文也只是演示基本的使用。sqlalchemy 的优点是基于 ORM,不需要手工编写 sql 语句,但需要用代码定义 model,这个 model 可以用 sqlacodegen 基于数据库表来自动生成。以下是 model 定义的代码:
from sqlalchemy import Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Employee(Base):
__tablename__ = "employees"
emp_id = Column("EMP_ID", String(255), primary_key=True)
first_name = Column("FIRST_NAME", String(255))
last_name = Column("LAST_NAME", String(255))
gender = Column("GENDER", String(255))
age = Column("AGE", Integer)
email = Column("EMAIL", String(255))
phone = Column("PHONE_NR", String(255))
education = Column("EDUCATION", String(255))
marital_stat = Column("MARITAL_STAT", String(255))
children = Column("NR_OF_CHILDREN", Integer)
def __repr__(self):
return "Employee <{emp_id},{first_name},{last_name},{gender},{age},{email},{phone},{education},{marital_stat},{children}>".format(
emp_id=self.emp_id,
first_name=self.first_name,
last_name=self.last_name,
gender=self.gender,
age=self.age,
email=self.email,
phone=self.phone,
education=self.education,
marital_stat=self.marital_stat,
children=self.children
)
以下是基于 sqlalchemy 增删改查的代码示例:
import unittest
from employee_sqlalchemy.models import Employee
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
engine = create_engine("access+pyodbc://EMP_MSACCESS")
Session = sessionmaker(bind=engine)
session = Session()
class TestSqlalchemy(unittest.TestCase):
def test_query(self):
employees = session.query(Employee).all()
for emp in employees:
print(emp)
def test_query_one(self):
employees = session.query(Employee).filter(Employee.first_name=="Ted").all()
for emp in employees:
print(emp)
def test_insert(self):
new_emp = Employee(
emp_id =9001,
first_name = "Alice",
last_name = "Brown"
)
session.add(new_emp)
session.commit()
session.close()
def test_update(self):
emp = session.query(Employee).get(9001)
if emp is not None:
emp.age = 20
session.commit()
session.close()
def test_delete(self):
emp = session.query(Employee).get(9001)
if emp is not None:
session.delete(emp)
session.commit()
session.close()
if __name__ == "__main__":
unittest.main()
源代码
github - python-using-msaccess-db
参考
Python3.7 pyodbc完美配置访问access数据库
|