1. 数据库读写

数据库是最常用的通过socket连接的软件,多数时候我们写服务,做分析写算法的数据来源都来自数据库,而结果也往往需要放入数据库.最常见的数据库是关系数据库,像标准库自带的sqlite,常见的postgresql,mysql就是关系数据库他们使用统一的操作语言SQL语言进行操作,但不同的数据库对SQL语言的支持并不完全一样.而像hive这样的实现了部分SQL语句的数仓也可以看做是这类数据库的一个扩展.

另一类是非关系数据库,那就比较多样了,比较常见的大致3类:

  • 以redis为代表的键值数据库
  • 以mongodb为代表文档数据库
  • 以influxdb为代表的时间序列数库
  • 以neo4j为代表的图数据库

这些数据库一般并不通用,而是在特定情境下有较大作用,我会介绍我用过的,没用过的也就不介绍了.

1.1. 关系数据库

关系数据库本身接口几乎是一致的,这边以postgresql为例介绍,本文测试的pg使用docker部署.

1.1.1. 同步接口的关系数据库

常见的同步接口关系数据库如下:

对应数据库
sqlite sqlite3标准库
postgresql psycopg2
mysql pymysql
mssql pymssql
hive pyhive

同步接口的关系数据库都是差不多的使用方法

  1. 先创建连接
  2. 创建一个游标
  3. 使用游标对象的.execute(sql)接口写入SQL语句
  4. 使用连接对象的.commit()接口提交sql语句
  5. 使用游标对象的.fetchall()接口获取结果
  6. 使用连接对象的.close()方法关闭连接
import psycopg2
dsn = "host=localhost port=5432 dbname=test user=postgres password=postgres"
sql = '''
    SELECT 
        column_name, table_name, data_type 
    FROM information_schema.columns
    WHERE table_schema='public' and table_name='company';'''
with psycopg2.connect(dsn) as conn:
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS company
           (id INT PRIMARY KEY     NOT NULL,
           name           TEXT    NOT NULL,
           age            INT     NOT NULL,
           address        CHAR(50),
           salary         REAL);''')
    conn.commit()
    c.execute(sql)
    result = c.fetchall()
print(result)
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]

使用peewee做orm

orm是工程上常用的关系型数据库使用方式,使用orm可以让数据库访问这个动作面相对象,获得的数据以及针对数据的操作更加直观,但同时因为毕竟是一层包装,所以也会损失一些性能,而且因为是面向对象操作所以牺牲了灵活性.因此比较适合在业务逻辑上使用(OLTP),对于数据处理的场景(OLAP),

通常我个人比较喜欢使用peewee这个orm.我常用的特性有:

  • 使用数据库的url访问数据库
  • 在未知数据库路径配置的情况下使用代理对象建立映射
  • 在未知表结构只知道表名的情况下获取表对象
  • 在未知表是否存在的情况下安全的建表
  • 使用上下文语法定义事务
  • 使用迭代器访问多条数据

peewee支持的数据有:

  • mysql
  • postgresql
  • sqlite

使用playhouse.db_url.connect的schema可以是:

  • apsw: APSWDatabase
  • mysql: MySQLDatabase
  • mysql+pool: PooledMySQLDatabase
  • postgres: PostgresqlDatabase
  • postgres+pool: PooledPostgresqlDatabase
  • postgresext: PostgresqlExtDatabase
  • postgresext+pool: PooledPostgresqlExtDatabase
  • sqlite: SqliteDatabase
  • sqliteext: SqliteExtDatabase
  • sqlite+pool: PooledSqliteDatabase
  • sqliteext+pool: PooledSqliteExtDatabase

定义表对象

from peewee import Proxy,Model,CharField,DateField

db = Proxy()

class Person(Model):
    name = CharField()
    birthday = DateField()

    class Meta:
        database = db # This model uses the "people.db" database.

连接数据库

from playhouse.db_url import connect
database = connect("postgres+pool://postgres:postgres@localhost:5432/test")

使用sql语句

with database:
        # with samh.execute_sql("DESC cartoon") as cursor:
        #     scheme = cursor.fetchall()
        #names = [i[0] for i in  scheme]
    with database.execute_sql("""
        SELECT column_name, table_name, data_type 
        FROM information_schema.columns
        WHERE table_schema='public' and table_name='company';
        """) as cursor:
        result = cursor.fetchall()
    print(result)
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]

映射Person并创建表

db.initialize(database)
db.create_tables([Person],safe=True)
with database:
        # with samh.execute_sql("DESC cartoon") as cursor:
        #     scheme = cursor.fetchall()
        #names = [i[0] for i in  scheme]
    with database.execute_sql("""
        SELECT column_name, table_name, data_type 
        FROM information_schema.columns
        WHERE table_schema='public' and table_name='person';
        """) as cursor:
        result = cursor.fetchall()
    print(result)
[('id', 'person', 'integer'), ('name', 'person', 'character varying'), ('birthday', 'person', 'date')]

未知表结构的情况下获取表对象

from playhouse.reflection import generate_models
COMPANY = generate_models(database).get("company")
COMPANY._meta.columns
{'id': <AutoField: company.id>,
 'name': <TextField: company.name>,
 'age': <IntegerField: company.age>,
 'address': <CharField: company.address>,
 'salary': <FloatField: company.salary>}

插入多条数据

import datetime

with db.atomic():
    query = Person.insert_many([
        {
        "name":"千万",
        "birthday":datetime.date(2019,3,4)
        },
        {
        "name":"十万",
        "birthday":datetime.date(2018,3,4)
        },
        {
        "name":"百万",
        "birthday":datetime.date(2017,3,4)
        }
    ])
    db.execute(query)

读取多条数据

from playhouse.shortcuts import model_to_dict
for p in Person.select():
    print(model_to_dict(p))
{'id': 1, 'name': '千万', 'birthday': datetime.date(2019, 3, 4)}
{'id': 2, 'name': '十万', 'birthday': datetime.date(2018, 3, 4)}
{'id': 3, 'name': '百万', 'birthday': datetime.date(2017, 3, 4)}
{'id': 4, 'name': '千万', 'birthday': datetime.date(2019, 3, 4)}
{'id': 5, 'name': '十万', 'birthday': datetime.date(2018, 3, 4)}
{'id': 6, 'name': '百万', 'birthday': datetime.date(2017, 3, 4)}

1.1.2. 异步接口的关系数据库

常见的异步接口关系数据库如下:

对应数据库
sqlite aiosqlite
postgresql aiopg
mysql aiomysql
import aiopg
pool = await aiopg.create_pool(dsn)
async with pool.acquire() as conn:
    async with conn.cursor() as cur:
        await cur.execute('''CREATE TABLE IF NOT EXISTS company
           (id INT PRIMARY KEY     NOT NULL,
           name           TEXT    NOT NULL,
           age            INT     NOT NULL,
           address        CHAR(50),
           salary         REAL);''')
        conn.commit()
        await cur.execute(sql)
        result = await cur.fetchall()
print(result)
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]

使用peewee_async将peewee变成异步orm

peewee是基于同步接口的,而异步语法具有传染性,如果使用peewee就会阻塞,好在有一个包peewee_async为我们做好了将其异步化的工作,需要注意的是目前这个包默认安装使用的是peewee 2,而要使用peewee3需要指定版本安装,0.6.0a是一个可以使用的版本

这个包支持的数据库有:

  • mysql
  • postgresql

使用playhouse.db_url.connect的schema可以是:

  • postgres+async
  • postgres+pool+async
  • mysql+async
  • mysql+pool+async
import peewee_async
from playhouse.db_url import connect
from playhouse.reflection import generate_models
database = connect("postgres+pool+async://postgres:postgres@localhost:5432/test")
Person = generate_models(database).get("person")
db_async = peewee_async.Manager(database)
import datetime
async with db_async.atomic():
    rows = [
        {
        "name":"千",
        "birthday":datetime.date(2016,3,4)
        },
        {
        "name":"十",
        "birthday":datetime.date(2015,3,4)
        },
        {
        "name":"百",
        "birthday":datetime.date(2014,3,4)
        }
    ]
    query = Person.insert_many(rows)
    await db_async.execute(query)
from playhouse.shortcuts import model_to_dict
for obj in await db_async.execute(Person.select()):
    print(model_to_dict(obj))
{'id': 1, 'name': '千万', 'birthday': datetime.date(2019, 3, 4)}
{'id': 2, 'name': '十万', 'birthday': datetime.date(2018, 3, 4)}
{'id': 3, 'name': '百万', 'birthday': datetime.date(2017, 3, 4)}
{'id': 4, 'name': '千万', 'birthday': datetime.date(2019, 3, 4)}
{'id': 5, 'name': '十万', 'birthday': datetime.date(2018, 3, 4)}
{'id': 6, 'name': '百万', 'birthday': datetime.date(2017, 3, 4)}
{'id': 7, 'name': '千', 'birthday': datetime.date(2016, 3, 4)}
{'id': 8, 'name': '十', 'birthday': datetime.date(2015, 3, 4)}
{'id': 9, 'name': '百', 'birthday': datetime.date(2014, 3, 4)}
{'id': 10, 'name': '千', 'birthday': datetime.date(2016, 3, 4)}
{'id': 11, 'name': '十', 'birthday': datetime.date(2015, 3, 4)}
{'id': 12, 'name': '百', 'birthday': datetime.date(2014, 3, 4)}

peewee-async的一处bug

至少在在0.6.0a版本peewee-async有一处bug,就是无法设置connect_timeout这个参数无法设置,我们可以为其打个猴子补丁

import asyncio
import peewee_async
from peewee_async import TaskLocals
async def connect_async(self, loop=None, timeout=None):
    """Set up async connection on specified event loop or
    on default event loop.
    """
    if self.deferred:
        raise Exception("Error, database not properly initialized "
                        "before opening connection")

    if self._async_conn:
        return
    elif self._async_wait:
        await self._async_wait
    else:
        self._loop = loop
        self._async_wait = asyncio.Future(loop=self._loop)

        connect_params = dict(self.connect_params_async)
        timeout = connect_params.pop('connect_timeout', timeout)
        conn = self._async_conn_cls(
            database=self.database,
            loop=self._loop,
            timeout=timeout,
            **connect_params)

        try:
            await conn.connect()
        except Exception as e:
            if not self._async_wait.done():
                self._async_wait.set_exception(e)
            self._async_wait = None
            raise
        else:
            self._task_data = TaskLocals(loop=self._loop)
            self._async_conn = conn
            self._async_wait.set_result(True)
peewee_async.AsyncDatabase.connect_async = connect_async
database = connect("postgres+pool+async://postgres:postgres@localhost:5432/test?connect_timeout=10")
Person = generate_models(database).get("user")
db_async = peewee_async.Manager(database)
async with db_async.atomic():
    rows = [
        {
        "name":"千",
        "age":12
        },
        {
        "name":"十",
        "age":12
        },
        {
        "name":"百",
        "age":13
        }
    ]
    query = Person.insert_many(rows)
    await db_async.execute(query)

1.2. 键值对内存数据库Redis

除了传统关系型数据库,业务上最常见的恐怕就是redis了.redis实际上分为两种:

  • 单机模式 其默认端口为6379
  • 集群模式

这两者在使用上并不完全一样,集群模式无法使用需要全局扫key的操作,比如keys这种.

Redis的命令很多这边不做过多介绍,可以看官方文档.redis支持5种数据结构

  • 字符串
  • 列表
  • 哈希表(python中的字典)
  • 集合
  • 有序集合

他们具体的操作可以看这个文档

redis因为其带着数据结构所以有不少邪道用法,具体的可以看我的这篇博客

1.2.1. Redis的同步接口

单机版本Redis使用包redis-py来连接,它自带一个连接池.需要注意的是从redis中取出的值时bytes类型

import redis
REDIS_URL = "redis://localhost"
redis_client = redis.from_url(REDIS_URL)# 本质上是个连接池

redis_client.execute_command("SET","a","a")
True
redis_client.execute_command("GET","a")
b'a'

集群版本的redis需要使用redis-py-cluster来访问,需要注意的是目前它依赖于2.0版本的redis-py

from rediscluster import StrictRedisCluster

startup_nodes = [
    {"host": host1, "port": port1},
    {"host": host2, "port": port2},
    {"host": host3, "port": port3}
]
rc = StrictRedisCluster(startup_nodes=startup_nodes, skip_full_coverage_check=True,password=password)
rc.set(key,value)

1.2.2. Redis异步接口

在异步接口方面redis有两个比较好的包:

  • aioredis 用的最多的一个包,但目前只支持单机redis

  • aredis 一个用C包aredis封装的异步redis客户端,接口很丰富性能也强,作者是个国人,支持单机redis和集群,但用的人相对少而且由于是个人开发所以更新不算频繁

aioredis

import aioredis

REDIS_URL = "redis://localhost"

redis_pool = await aioredis.create_pool(REDIS_URL)
async with redis_pool.get() as conn:
    result = await conn.execute("GET", "a")
result
b'a'

aredis

aredis使用StrictRedis类连接单机redis,使用StrictRedisCluster连接redis集群,其他的操作都是一样的

from aredis import StrictRedis

client = StrictRedis.from_url(REDIS_URL)

result = await client.execute_command('GET','a')
result
b'a'

1.3. 时间序列数库influxdb

influxdb是目前最流行的时间序列数据库,它支持类似sql语言InfluxQL的特殊语法进行操作,也可以使用http接口发起请求,因此简单好用.

influxdb默认端口为8086

influxdb的同步接口

influxdb同步接口可以使用包influxdb

from influxdb import InfluxDBClient
client = InfluxDBClient(host='localhost',port=8086,database='test')
json_body = [
    {
        "measurement": "cpu_load_short",
        "tags": {
            "host": "server01",
            "region": "us-west"
        },
        "time": "2009-11-10T23:00:00Z",
        "fields": {
            "value": 0.64
        }
    }
]
client.create_database('test')
client.write_points(json_body)
resp = client.query("""SHOW MEASUREMENTS""")
resp
ResultSet({'('measurements', None)': [{'name': 'cpu_load_short'}]})

influxdb的异步接口

异步接口使用aioinflux它其实只是封装了influxdb的RESTful接口.但个人认为用起来更好用

from aioinflux import InfluxDBClient
point = {
    'time': '2009-11-10T23:00:00Z',
    'measurement': 'cpu_load_short',
    'tags': {'host': 'server01',
             'region': 'us-west'},
    'fields': {'value': 0.64}
}


async with InfluxDBClient(db='testdb') as client:
    await client.create_database(db='testdb')
    await client.write(point)
    resp = await client.query('SELECT value FROM cpu_load_short')
resp
{'results': [{'statement_id': 0,
   'series': [{'name': 'cpu_load_short',
     'columns': ['time', 'value'],
     'values': [[1257894000000000000, 0.64]]}]}]}

1.4. 图数据库ArangoDB

arangodb是一个开源的图数据库,它支持一种类似SQL的语法AQL同时也可以使用RESTful接口请求.

ArangoDB默认端口为8529,自带一个相当美观好用的web服务,我们可以在其上进行很多操作.

arangodb的同步接口

arangodb只有封装好的同步接口python-arango

from arango import ArangoClient

# 初始化连接
client = ArangoClient(protocol='http', host='localhost', port=8529)

# 以root用户连接到默认数据库`_system`
sys_db = client.db('_system', username='root', password='hsz')

# 创建`test`数据库.
sys_db.create_database('test')
True

arangodb可以像一般文档数据库一样使用

# 使用test数据库
db = client.db('test', username='root', password='hsz')

# 创建集合,也就是相当于关系数据库中的表
db.create_collection('students')

students = db.collection('students')
# 建立索引
students.add_hash_index(fields=['name'], unique=True)

students.insert({'name': 'jane', 'age': 39})
students.insert({'name': 'josh', 'age': 18})
students.insert({'name': 'judy', 'age': 21})

cursor = db.aql.execute('FOR doc IN students RETURN doc')
student_names = [document['name'] for document in cursor]
print(student_names)
# 删除集合
db.delete_collection('students')
['jane', 'josh', 'judy']





True
# 创建图`school`
db.create_graph('school')

# 使用图`school`
graph = db.graph('school')
# 为图创建节点集合--类似关系数据库中的表
students = graph.create_vertex_collection('students')
lectures = graph.create_vertex_collection('lectures')


# 为图创建边
register = graph.create_edge_definition(
    edge_collection='register',# 边集合
    from_vertex_collections=['students'],
    to_vertex_collections=['lectures']
)

# 插入节点数据
students.insert({'_key': '01', 'full_name': 'Anna Smith'})
students.insert({'_key': '02', 'full_name': 'Jake Clark'})
students.insert({'_key': '03', 'full_name': 'Lisa Jones'})

lectures.insert({'_key': 'MAT101', 'title': 'Calculus'})
lectures.insert({'_key': 'STA101', 'title': 'Statistics'})
lectures.insert({'_key': 'CSC101', 'title': 'Algorithms'})

# 插入边数据
register.insert({'_from': 'students/01', '_to': 'lectures/MAT101'})
register.insert({'_from': 'students/01', '_to': 'lectures/STA101'})
register.insert({'_from': 'students/01', '_to': 'lectures/CSC101'})
register.insert({'_from': 'students/02', '_to': 'lectures/MAT101'})
register.insert({'_from': 'students/02', '_to': 'lectures/STA101'})
register.insert({'_from': 'students/03', '_to': 'lectures/CSC101'})

# 广度优先遍历图.
result = graph.traverse(
    start_vertex='students/01',
    direction='outbound',
    strategy='breadthfirst'
)


db.delete_collection('register')
db.delete_collection('students')
db.delete_collection('lectures')
db.delete_graph('school')
True
Copyright © hsz 2017 all right reserved,powered by Gitbook该文件修订时间: 2019-07-11 22:59:03

results matching ""

    No results matching ""