
上篇文章介绍了 golang 如何连接 Greenplum,那么本文章来介绍在这几年火的一塌糊涂的 Python 语言如何连接 Greenplum。
Python 连接 Greenplum 数据库较常用的库有 PyGreSQL 和 Psycopg2 两个。Greenplum 的很多脚本都是采用 PyGreSQL 为基础开发的,可见 PyGreSQL 肯定有其独到之处,但是 Psycopg2 这几年似乎在 Postgres 体系中更加流行。本文将会分别介绍这两个库的使用。
PyGre sql
PyGreSQL 是连接 PostgreSQ L的 Python 库,目前最新版本为 PyGreSQL 5.1,支持 PostgreSQL 9.0到11版本,可以对应到 Greenplum 6.x 的版本,如果要支持 Greenplum 4.x 和 5.x 版本,可以选用 PyGreSQL 4.x 版本。
安装
pip install PyGreSQL
示例
#!/usr/bin/env python
import pg
def operate_postgre_tbl_product():
try:
#pgdb_conn = pg.connect(dbname = 'tpc', host = '192.168.103.31', user = 'gpadmin', passwd = '')
pgdb_conn = pg.connect("host=192.168.103.31 port=5432 dbname=tpc user=gpadmin")
except Exception, e:
print e.args[0]
return
sql_desc = "select * from call_center limit 5;"
for row in pgdb_conn.query(sql_desc).dictresult():
print row
pgdb_conn. close ()
if __name__ == '__main__':
operate_postgre_tbl_product()
参考文章
Psycopg2
Psycopg2 库的底层是由C语言封装 PostgreSQL 的标准库C接口库 libpq 实现的,运行速度非常快,它支持大型多线程应用的大量并发 Insert 和 Update 操作,另外它完全兼容 DB API 2.0。
安装
pip install psycopg2
示例
1、简单的增加,查询记录
import psycopg2
import psycopg2.extras
import time
'''
连接数据库
returns:db
'''
def gp_connect():
try:
db = psycopg2.connect(dbname="testdb",
user="gpadmin",
password="gpadmin",
host="10.1.208.42",
port="5432")
# connect()也可以使用一个大的字符串参数,
# 比如”host=localhost port=5432 user=postgres password=postgres dbname=test”
return db
except psycopg2.DatabaseError as e:
print("could not connect to Greenplum server",e)
if __name__ == '__main__':
conn = gp_connect()
print(conn)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# 这里创建的是一个字典Cursor, 这样返回的数据, 都是字典的形式, 方便使用
ret = cur.execute("CREATE TABLE public.gp_test (id serial PRIMARY KEY, num integer, data varchar);")
conn. commit ()
# 提交到数据库中
print(ret)
ret = cur.execute("INSERT INTO public.gp_test (num, data) VALUES (%s, %s);",(300, "abc'def"))
conn.commit()
# 提交到数据库中
print(cur.rowcount) # 1
# 返回数据库中的行的总数已修改,插入或删除最后 execute*().
ret_sql = cur.mogrify("select * from pg_tables where tablename = %s;", ('gp_test',))
# 返回生成的sql脚本, 用以查看生成的sql是否正确.
# sql脚本必须以;结尾, 不可以省略.其次, 不管sql中有几个参数, 都需要用 % s代替, 只有 % s, 不管值是字符还是数字, 一律 % s.
# 最后, 第二个参数中, 一定要传入元组, 哪怕只有一个元素, 像我刚才的例子一样, ('gp_test')这样是不行的.
print(ret_sql.decode('utf-8')) # select * from pg_tables where tablename = E'gp_test';
cur.execute("select * from gp_test where num = %s;", (300,))
pg_obj = cur.fetchone()
print(pg_obj) # {'id': 1, 'num': 300, 'data': "abc'def"}
conn.close() # 关闭连接
2、批量插入,查询
conn = gp_connect()
print(conn)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# # 这里创建的是一个字典Cursor, 这样返回的数据, 都是字典的形式, 方便使用
# ret = cur.execute("CREATE TABLE public.gp_test (id serial PRIMARY KEY, num integer, data varchar);")
# conn.commit()
# # 提交到数据库中
# print(ret)
gp_list = []
for i in range(200):
gp_list.append((i,'abc%s'%i))
# print(gp_list)
# 批量提交数据
ret = cur.executemany("INSERT INTO public.gp_test (num, data) VALUES (%s, %s);", gp_list)
conn.commit()
# 提交到数据库中
print(cur.query) # 查看上一条执行的脚本
print(cur.rowcount) # 200
# 返回数据库中的行的总数已修改,插入或删除最后 execute*().
cur.execute("select count(*) num from gp_test")
pg_obj = cur.fetchone()
print(pg_obj) # {'num': 200}
conn.close() # 关闭连接
3、使用连接池,执行高性能的批量插入与查询
import psycopg2
import psycopg2.extras
import psycopg2.pool
from datetime import datetime
'''
连接数据库
使用 数据库连接池
returns:db
'''
def gp_connect():
try:
simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5,dbname="testdb",
user="gpadmin",
password="gpadmin",
host="10.1.208.42",
port="5432")
# connect()也可以使用一个大的字符串参数,
# 比如”host=localhost port=5432 user=postgres password=postgres dbname=test”
# 从数据库连接池获取连接
conn = simple_conn_pool.getconn()
return conn
except psycopg2.DatabaseError as e:
print("could not connect to Greenplum server",e)
if __name__ == '__main__':
conn = gp_connect()
print(conn)
cur = conn.cursor()
# 批量查询大小
batch_size = 1000
gp_list = []
for i in range(2000, 100000):
gp_list.append((i,'abc%s'%i))
# print(gp_list)
# 开始时间
start_time = datetime.now()
# 批量提交数据execute_values性能大于executemany
psycopg2.extras.execute_values(cur, "INSERT INTO public.gp_test (num, data) VALUES %s", gp_list)
conn.commit()
# 提交到数据库中
cur.execute("select * from gp_test order by id")
count = 0
while True:
count = count + 1
# 每次获取时会从上次 游标 的位置开始移动size个位置,返回size条数据
data = cur.fetchmany(batch_size)
# 数据为空的时候中断循环
if not data:
break
else:
print(data[-1]) # 得到最后一条(通过元祖方式返回)
print('获取%s到%s数据成功' % ((count - 1) * batch_size, count * batch_size))
print('insert到fetchmany获取全量数据所用时间:', (datetime.now() - start_time).seconds) # 16s
conn.close() # 关闭连接
4、执行高性能的批量更新与查询
import psycopg2
import psycopg2.extras
import psycopg2.pool
from datetime import datetime
'''
连接数据库
使用数据库连接池
returns:db
'''
def gp_connect():
……略
if __name__ == '__main__':
conn = gp_connect()
print(conn)
cur = conn.cursor()
# 批量查询大小
batch_size = 1000
gp_uplist = [] # 更新列表
for i in range(2000, 10000):
gp_uplist.append((i,'def%s'%i))
print(gp_uplist)
# 开始时间
start_time = datetime.now()
# 批量提交数据execute_values性能大于executemany
sql = "UPDATE public.gp_test SET data = TEST.data " \
"FROM (VALUES %s) AS TEST(num, data) " \
"WHERE public.gp_test.num = TEST.num"
# 批量更新语句模版 UPDATE TABLE SET TABLE.COL = XX.col
# FROM (VALUES %s) AS XX(id_col,col)
# WHERE TABLE.id_col = XX.id_col
# XX为别名
psycopg2.extras.execute_values(cur, sql, gp_uplist, page_size=100)
print(cur.query)
conn.commit()
# 提交到数据库中
cur.execute("select * from gp_test order by id")
count = 0
while True:
count = count + 1
# 每次获取时会从上次游标的位置开始移动size个位置,返回size条数据
data = cur.fetchmany(batch_size)
# 数据为空的时候中断循环
if not data:
break
else:
print(data[-1]) # 得到最后一条(通过元祖方式返回)
print('获取%s到%s数据成功' % ((count - 1) * batch_size, count * batch_size))
print('update到fetchmany获取全量数据所用时间:', (datetime.now() - start_time).seconds) # 16s
conn.close() # 关闭连接
5、使用服务端游标
#逐条处理
with psycopg2.connect(database_connection_string) as conn:
with conn.cursor(name='name_of_cursor') as cursor:
cursor.itersize = 20000
query = "SELECT * FROM ..."
cursor.execute(query)
for row in cursor:
# process row
#2 一次处理多条
while True:
rows = cursor.fetchmany(100)
if len(rows) > 0:
for row in rows:
# process row
else:
break
参考文章
END~