Blog

Creating Custom AES Functions for SQLAlchemy

July 9, 2015

Creating Custom AES Functions for SQLAlchemy

※ 이 글은 원본이 갱신되었습니다. 원본을 읽어주시면 감사하겠습니다.

 

개인적으로 지금 DB에 자료를 암호화해서 넣은 후 필요한 경우에만 암호화를 풀어서 자료를 열람하거나 쿼리의 조건으로 쓰는 작업을 해야하는데, 이것에 대해서는 Spoqa 기술 블로그에 이미 간단한 방법이 소개되어 있다.
저 글에서는 Flask-SQLAlchemy(로 추측되는 무언가), PyCrypto, MySQL등을 사용해서 저장값이 Float형인 경우만 고려해서 구현했는데, 나는 PostgreSQL을 써야했기에 저 글을 그대로 가져다 쓸 수 없었다.
그래서 가장 처음 한 것이 sqlalchemy.sql.expression.func의 실제 동작에 대한 분석이다.
그리고 문자열이나 Blob형 자료를 다루기 위해서는 padding이 중요한데, padding을 어떻게 해야할지 알아보기 위해 PostgreSQL에서 쓰는 실제 padding algorithm을 소스를 분석하여 알아보았다.
이 글은 앞서 알아본 두 가지 정보를 활용하여 작성되었다.

처음엔 이 글에서 PostgreSQL만 다루려고 하였으나 만약의 사태에 대비하여 MySQL을 쓰는 경우에 대해서도 작성해두었다.
다만 여기에는 치명적인 한계가 있는데 MySQL은 5.6.17 이상에서만 CBC 모드를 지원한다.
즉, 그 이하 버전이거나 MariaDB를 쓰는 경우에는 ECB모드를 사용해야한다. (당연히 보안성이 그만큼 떨어진다.)

이 글은 MySQL과 PostgreSQL 양쪽 모두를 가능한 선에서 최대한 지원하려고 하였으므로 소스코드가 복잡할 것이고, 아직 최적화를 하지 않은 코드가 몇 있어서 (Flask에 아주 약한 의존성이 있다던가) 실제 가져다 쓰려면 조금 수정이 필요할 것이다.

먼저 SA 함수를 만드려면 class를 만들고 그 클래스의 부모 클래스로 sqlalchemy.sql.functions.FunctionElement를 지정해야한다.
그리고 함수의 반환형을 지정해주어야 한다.
나는 encryption한 결과는 LargeBinary형이고(MySQL에선 BLOB, PostgreSQL에선 bytea)로 지정되도록 하고 싶었고, decryption한 결과는 바로 사용 가능하도록 String형으로 하고 싶었다.
(이것은 내가 암호화할 정보들의 거의 대부분이 String이기 때문이기도 하다. 필요에 따라 바꾸자.)
따라서 다음과 같이 만들었다.

from sqlalchemy.sql.functions import FunctionElement


class aes_encrypt(FunctionElement): #  noqa
    name = 'aes_encrypt'
    type = LargeBinary


class aes_decrypt(FunctionElement): #  noqa
    name = 'aes_decrypt'
    type = String

(여기서 # noqaflake8 검사를 통과하기 위한 것이다.)

그 다음에 실제 구현을 만들어야한다.
먼저 (간단한) MySQL 구현부터 만들어보자.
MySQL은 암호화에 aes_encrypt, 복호화에 aes_decrypt 함수를 사용한다. 반환형은 모두 BINARY이다.
나는 복호화한 결과물을 바로 문자열로써 쓰고 싶으므로 복호화한 결과물은 cast(... AS CHAR)도 해주어야 할 것이다.
암호화에는 (당연히) iv(initial vector)가 필요하지만 ECB모드로 사용할 경우도 감안해서 iv를 안넣어도 잘 돌아가도록 만들었다.

from sqlalchemy.ext.compiler import compiles

@compiles(aes_encrypt, 'mysql')
def _aes_encrypt_mysql(element, compiler, **kw):
    try:
        data, key, iv = list(element.clauses)
        return "aes_encrypt({}, {}, {})".format(
            compiler.process(data),
            compiler.process(key),
            compiler.process(iv)
        )
    except ValueError:
        data, key = list(element.clauses)
        return "aes_encrypt({}, {})".format(
            compiler.process(data),
            compiler.process(key)
        )


@compiles(aes_decrypt, 'mysql')
def _aes_decrypt_mysql(element, compiler, **kw):
    try:
        data, key, iv = list(element.clauses)
        return "cast(aes_decrypt({}, {}, {}) AS CHAR)".format(
            compiler.process(data),
            compiler.process(key),
            compiler.process(iv)
        )
    except ValueError:
        data, key = list(element.clauses)
        return "cast(aes_decrypt({}, {}) AS CHAR)".format(
            compiler.process(data),
            compiler.process(key)
        )

@compiles decorator는 첫 인자로 해당 함수를, 두번째 인자(optinal)로 dbms 종류를 받는다.
decorate된 함수는 element, compiler, **kw의 인자를 받아야한다.
list(element.clauses)를 하면 실제 함수를 실행했을때의 인자들을 받을 수 있다.
각 인자에 compiler.process함수를 적용해서 넣는 것을 알 수 있는데, 실제로 SA 내부적으로 인자의 종류에 따라 자료형을 변환하는 등의 작업을 하기 때문에 필요하다.

이번엔 PostgreSQL의 구현이다.

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import bindparam


def _pgsql_parse_args(args, compiler):
    args = list(args)
    args_len = len(args)
    if args_len == 4:
        data, key, iv, set_padding_none = args
    elif args_len == 3:
        data, key, iv = args
        set_padding_none = False
    else:
        data, key = args
        iv = None
        set_padding_none = False

    mode_str = 'aes-cbc/pad:{}'
    if set_padding_none:
        mode_str = mode_str.format('none')
    else:
        mode_str = mode_str.format('pkcs')

    mode = compiler.process(bindparam('mode', mode_str, type_=String))

    return data, key, iv, mode


@compiles(aes_encrypt, 'postgresql')
def _aes_encrypt_pgsql(element, compiler, **kw):
    data, key, iv, mode = _pgsql_parse_args(element.clauses, compiler)

    if iv is not None:
        return "encrypt_iv({}, {}, {}, {})".format(
            compiler.process(data),
            compiler.process(key),
            compiler.process(iv),
            mode,
        )
    else:
        return "encrypt({}, {}, {})".format(
            compiler.process(data),
            compiler.process(key),
            mode,
        )


@compiles(aes_decrypt, 'postgresql')
def _aes_decrypt_pgsql(element, compiler, **kw):
    data, key, iv, mode = _pgsql_parse_args(element.clauses, compiler)

    if iv is not None:
        return (
            "convert_from(decrypt_iv({}, {}, {}, {}), 'UTF8')"
        ).format(
            compiler.process(data),
            compiler.process(key),
            compiler.process(iv),
            mode,
        )
    else:
        return (
            "convert_from(decrypt({}, {}, {}), 'UTF8')"
        ).format(
            compiler.process(data),
            compiler.process(key),
            mode,
        )

PostgreSQL은 AES 암호화에 encrypt, 복호화에 decrypt란 함수를 쓰는데, AES를 사용할 경우 마지막 파라메터로 명시적으로 'aes'라고 넣어줘야 한다.
(왜냐하면 'bf', 즉 Blowfish 알고리즘도 지원하기 때문이다.)
마지막 파라메터는 'aes[-(mode)][/pad:(padding)] 형식인데, 나는 그냥 명시적으로 모두 기입하도록 하였다.
여기서 padding이 나오는데, MySQL은 (뭘 쓰는진 몰라도) 알아서 처리해주는데, PostgreSQL은 명시적으로 padding을 쓰지 않을 수 있으므로 옵션으로 받을 수 있도록 처리해주었다.
(set_padding_noneFalse이면 mode_str.format('none')을 하는 부분을 유심히 보자)
여기서 한가지 의문을 가질 수 있는 부분은 CBC모드면서 iv를 받지 않아도 실행이 되냐는 점인데, 실행이 된다.
PgCrypto가 알아서 내부적으로 iv를 관리한다고 한다.

여기서 또 처음 보는 작업이 나오는데 바로 bindparam이다.
본디 bindparam은 이렇게 쓰라고 있는 것이다.
(SA docs에서 인용했다.)

from sqlalchemy import bindparam

stmt = select([users_table]).
            where(users_table.c.name == bindparam('username'))

result = connection.execute(stmt, username='wendy')

이렇게 하면 SQL이 다음과 같은 형태로 준비되어있다가 실행된다.

SELECT id, name FROM user WHERE name = :username

그런데 나는 이것을 조금 더 이용해먹기로 했다.
값과, 실제 유형까지 미리 주고서 compiler가 활용하도록 한 것이다.
내가 만든 mode_str을 실제 SA에서 사용할 수 있는 자료형으로 만든 것이다.

그런데 두 DBMS 모두 AES 암호화를 사용하기 위한 전제조건이 있다.
MySQL은 mode를 지정해주는 SQL을 실행해줘야 활성화되고, PostgreSQL은 아예 extention을 설치해줘야 한다.
이러한 작업은 DB 접속이 활성화되자마자 진행되어야 하므로 SA의 core event를 사용하기로 하였다.

from flask import current_app
from sqlalchemy import event


def install_specific_dialects_event(engine):
    aes_key = current_app.config['AES_KEY']
    aes_key_bit = len(aes_key)*8
    dialect_name = engine.dialect.name
    if dialect_name == 'mysql':
        @event.listens_for(engine, 'connect')
        def mysql_on_connect(conn, record):
            curr = conn.cursor()
            curr.execute(
                "SET block_encryption_mode = 'aes-{}-cbc';".format(
                    aes_key_bit
                )
            )
            curr.close()
    elif dialect_name == 'postgresql':
        @event.listens_for(engine, 'first_connect')
        def pgsql_on_first_connect(conn, record):
            curr = conn.cursor()
            try:
                curr.execute("create extension pgcrypto;")
            except Exception as e:
                if 'extension "pgcrypto" already exists' in str(e):
                    conn.rollback()
                else:
                    raise e
            curr.close()

여기서 aes_key를 구하는 부분이 있는데, MySQL을 위한 부분이다.
암호화에 사용할 KEY의 길이를 알려줘야 하기 때문이다. 여기선 flask.current_app을 사용했는데 딱히 의무적으로 flask를 써야하는 것은 아니므로 필요에 따라 조정해주자.
core event를 사용할때의 주의사항은 넘겨받는 conn 변수가 session.connection()으로 받는 것과 다른 종류라는 것이다.
따라서 conn.cursor()도 SA에서 넘겨받는 cursor와 다른 동작을 한다.
실제 에러가 났을 경우에 명시적으로 conn.rollback() 해주어야만 한다.
(그러지 않는다면 첫 SQL 실행이 실패할 것이다. 그 뒤부터 실행하는건 또 잘 되므로 혼란스러울 수 있다)

여기서 만든 함수는 engine을 생성해줄때, 즉 create_engine 함수 아래에서 실행해주면 된다.

from sqlalchemy import create_engine


engine = create_engine(db_url)
install_specific_dialects_event(engine)

자, 이제 잘 실행되는지 실험해보자.
(import 하는 파일 경로는 그냥 내가 개인적으로 하고 있는 프로젝트에서 사용중인 것이므로 신경쓰지 말자)
원활한 실행 진행을 위해 create_engine 함수에 echo=True 옵션을 주었다.
(실행은 flask의 app_context 안에서 돌고 있다. Flask-Script에서 기본 제공하는 shell 명령어와 거의 동일한 환경이다. 따라서 engine에 뭔가 설치하는 등의 과정은 생략했다)

먼저 DBMS로 MySQL을 줄 경우이다.
이 경우 주의해야하는 점은 몇몇 MySQL connector들이 default charset으로 latin7을 사용하는 만행을 저지르고 있다.
charset을 꼭 지정해주자.

Python 3.4.3 (v3.4.3:9b73f1c3e601, Feb 23 2015, 02:52:03) 
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from testenv.orm import session
>>> from testenv.sql import aes_encrypt, aes_decrypt
>>> key = b'1234567890123456' * 2
>>> iv = b'abcdefghijklmnop'
>>> session.query(aes_decrypt(aes_encrypt('item4의 AES 테스트 현장', key, iv), key, iv)).one()
2015-07-09 20:33:08,223 INFO sqlalchemy.engine.base.Engine SHOW VARIABLES LIKE 'sql_mode'
2015-07-09 20:33:08,223 INFO sqlalchemy.engine.base.Engine ()
2015-07-09 20:33:08,225 INFO sqlalchemy.engine.base.Engine SELECT DATABASE()
2015-07-09 20:33:08,225 INFO sqlalchemy.engine.base.Engine ()
2015-07-09 20:33:08,228 INFO sqlalchemy.engine.base.Engine show collation where `Charset` = 'utf8' and `Collation` = 'utf8_bin'
2015-07-09 20:33:08,229 INFO sqlalchemy.engine.base.Engine ()
2015-07-09 20:33:08,233 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS CHAR(60)) AS anon_1
2015-07-09 20:33:08,233 INFO sqlalchemy.engine.base.Engine ()
2015-07-09 20:33:08,235 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS CHAR(60)) AS anon_1
2015-07-09 20:33:08,235 INFO sqlalchemy.engine.base.Engine ()
2015-07-09 20:33:08,236 INFO sqlalchemy.engine.base.Engine SELECT CAST('test collated returns' AS CHAR CHARACTER SET utf8) COLLATE utf8_bin AS anon_1
2015-07-09 20:33:08,236 INFO sqlalchemy.engine.base.Engine ()
2015-07-09 20:33:08,238 INFO sqlalchemy.engine.base.Engine SELECT cast(aes_decrypt(aes_encrypt(%s, %s, %s), %s, %s) AS CHAR) AS aes_decrypt_1
2015-07-09 20:33:08,238 INFO sqlalchemy.engine.base.Engine ('item4의 AES 테스트 현장', b'12345678901234561234567890123456', b'abcdefghijklmnop', b'12345678901234561234567890123456', b'abcdefghijklmnop')
('item4의 AES 테스트 현장',)

암호화 후 바로 복호화했는데 원문이 그대로 나왔다.
실제 출력된 SQL을 보면 잘 동작함을 알 수 있다.

이번엔 PostgreSQL에서 실행해보자.

Python 3.4.3 (v3.4.3:9b73f1c3e601, Feb 23 2015, 02:52:03) 
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from testenv.orm import session
>>> from testenv.sql import aes_encrypt, aes_decrypt
>>> key = b'1234567890123456' * 2
>>> iv = b'abcdefghijklmnop'
>>> session.query(aes_decrypt(aes_encrypt('item4의 AES 테스트 현장 PostgreSQL', key, iv), key, iv)).
2015-07-09 20:37:38,445 INFO sqlalchemy.engine.base.Engine select version()
2015-07-09 20:37:38,445 INFO sqlalchemy.engine.base.Engine {}
2015-07-09 20:37:38,448 INFO sqlalchemy.engine.base.Engine select current_schema()
2015-07-09 20:37:38,449 INFO sqlalchemy.engine.base.Engine {}
2015-07-09 20:37:38,452 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2015-07-09 20:37:38,452 INFO sqlalchemy.engine.base.Engine {}
2015-07-09 20:37:38,453 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
2015-07-09 20:37:38,453 INFO sqlalchemy.engine.base.Engine {}
2015-07-09 20:37:38,453 INFO sqlalchemy.engine.base.Engine show standard_conforming_strings
2015-07-09 20:37:38,453 INFO sqlalchemy.engine.base.Engine {}
2015-07-09 20:37:38,456 INFO sqlalchemy.engine.base.Engine SELECT convert_from(decrypt_iv(encrypt_iv(%(aes_encrypt_1)s, %(aes_encrypt_2)s, %(aes_encrypt_3)s, %(mode)s), %(aes_decrypt_2)s, %(aes_decrypt_3)s, %(mode)s), 'UTF8') AS aes_decrypt_1
2015-07-09 20:37:38,456 INFO sqlalchemy.engine.base.Engine {'aes_encrypt_1': 'item4의 AES 테스트 현장 PostgreSQL', 'aes_decrypt_2': <psycopg2.extensions.Binary object at 0x103dcd620>, 'aes_encrypt_3': <psycopg2.extensions.Binary object at 0x103dcda08>, 'mode': 'aes-cbc/pad:pkcs', 'aes_encrypt_2': <psycopg2.extensions.Binary object at 0x103df91e8>, 'aes_decrypt_3': <psycopg2.extensions.Binary object at 0x103df9288>}
('item4의 AES 테스트 현장 PostgreSQL',)

역시 잘 됨을 확인할 수 있다.

이것을 활용하면 맨 처음 인용한 hybrid_property등으로 사용할 수 있을 것이다.
이러한 응용법은 본 글에서 다루려고 한 범위를 벗어나므로 다음번에 다루려고 한다.

사실 이 글을 쓸 때 Oracle에서도 동작하게 해볼까 했으나 문제가 있어서 실행에 옮기진 못했다.
Python용 Oracle connector는 cx_Oracle이란 것인데, 이것의 설치를 위해서는 시스템에 Oracle이 필요하다.
우회법이 없진 않을 것 같지만 별로 그렇게까지 하고 싶진 않았다.
Oracle도 AES를 지원하므로 필요한 사람은 이 글을 응용해서 해봐도 좋지 않을까 싶다.