# -*- coding:utf-8 -*- import clickhouse_driver if __name__ __main__: host 192.168.88.131 port 9000 # 注意不能使用默认的8123 username testacc password test1234 database default # 连接方式1 # conn clickhouse_driver.connect(database database, # user username, # passwordpassword, # host host, # port port) # 连接方式2 connection_str fclickhouse://{username}:{password}{host}:{port}/{database} conn clickhouse_driver.connect(connection_str) cursor conn.cursor() cursor.execute(SHOW TABLES) res cursor.fetchall() print(res) # 输出形如 [(table1,), (test,)] # 删除表 cursor.execute(DROP TABLE IF EXISTS test) print(cursor.fetchall()) # 输出:[] cursor.execute(CREATE TABLE test (x Int32) ENGINE Memory) print(cursor.fetchall()) # 输出:[] cursor.executemany(INSERT INTO test (x) VALUES, [{x: 100}]) print(cursor.rowcount) # 获取execute 产生记录数 输出1 # 插入多条 cursor.executemany(INSERT INTO test (x) VALUES, [[300],[400]]) cursor.executemany(INSERT INTO test (x) VALUES, [[200]]) print(cursor.rowcount) # 输出1 cursor.execute(INSERT INTO test (x) SELECT * FROM system.numbers LIMIT %(limit)s, {limit: 3}) print(cursor.rowcount) # 输出1 cursor.execute(SELECT sum(x) AS sum_value FROM test) print(cursor.rowcount) # 输出1 print(cursor.columns_with_types) # 获取查询列名及类型输出[(sum_value, Int64)] cursor.execute(SELECT * FROM test) print(cursor.rowcount) # 输出5 print(cursor.columns_with_types) # 输出[(x, Int32)] res cursor.fetchall() print(res) # 输出[(100,), (200,), (0,), (1,), (2,)] print(cursor.fetchone()) #输出None ############################# cursor.execute(SELECT * FROM test) print(cursor.fetchone()) # 输出(100,) # 仅取3条 print(cursor.fetchmany(3)) # 输出[(200,), (0,), (1,)] ############################# cursor.execute(SELECT * FROM test) print(cursor.fetchone()) # 输出(100,) print(cursor.fetchall()) # 输出[(200,), (0,), (1,), (2,)] cursor.close() conn.close()说明conn clickhouse_driver.connect(connection_str)connection_strclickhouse://[{username}:{password}]{host}[:{port}][/{database}]其中{database}默认为default类库封装# -*- coding:utf-8 -*- import re import traceback import clickhouse_driver from utils.log import logger class ClickhouseCli: def __init__(self, db_name, db_host, port3306, user, password, connect_timeout15): try: self.dbconn None self.host db_host self.port port self.user user self.passwd password self.db_name db_name self.connect_timeout connect_timeout self.connect_config {host: self.host, port: self.port, user: self.user, password: self.passwd, database: self.db_name, connect_timeout: self.connect_timeout} self.__connect_database() logger.debug(初始化数据库连接成功(数据库%s) % self.db_name) except Exception as e: raise Exception(初始化数据库(%s)连接失败%s % (self.db_name, traceback.format_exc())) def __connect_database(self): self.dbconn clickhouse_driver.connect(**self.connect_config) def insert(self, query, paramsNone): 插入单条数据 示例 :query INSERT INTO test (x) VALUES :params [{x: 100}] :query INSERT INTO test (x) VALUES :params [[100]] :query INSERT INTO test (x) SELECT * FROM system.numbers LIMIT %(limit)s :params {limit: 3} try: db_cursor self.dbconn.cursor() except Exception: logger.error(获取数据库游标失败%s正在尝试重新连接数据库 % traceback.format_exc()) self.__connect_database() db_cursor self.dbconn.cursor() try: db_cursor.execute(query, params) db_cursor.close() except Exception: db_cursor.close() raise Exception(f执行数据库插入操作({query})失败{traceback.format_exc()}) def delete(self, query, paramsNone): 例子 :query ALTER TABLE test DELETE WHERE id %(id)s :params [{id: 1}] 当然也可以把参数放到query中 ALTER TABLE test DELETE WHERE id 2 try: db_cursor self.dbconn.cursor() except Exception: logger.error(获取数据库游标失败%s正在尝试重新连接数据库 % traceback.format_exc()) self.__connect_database() db_cursor self.dbconn.cursor() try: db_cursor.execute(query, params) db_cursor.close() except Exception: db_cursor.close() raise Exception(f执行数据库删除操作({query})失败{traceback.format_exc()}) def update(self, query, paramsNone, auto_optimizeFalse): :auto_optimize 是否自动优化表 True - 是 False - 否 例子 :query ALTER TABLE test UPDATE log_message%(log_message)s WHERE id %(id)s :params {log_message:log message, id: 2} 当然也可以把参数放到query中 ALTER TABLE test UPDATE log_messagelog message WHERE id 2 try: db_cursor self.dbconn.cursor() except Exception: logger.error(获取数据库游标失败%s正在尝试重新连接数据库 % traceback.format_exc()) self.__connect_database() db_cursor self.dbconn.cursor() try: db_cursor.execute(query, params) if auto_optimize: res re.findall(ralter\stable\s(.)\supdate, query, re.I) if len(res) 0: table_name res[0] db_cursor.execute(fOPTIMIZE TABLE {table_name} FINAL) db_cursor.close() except Exception: db_cursor.close() raise Exception(f执行数据库更新操作({query})失败{traceback.format_exc()}) def select(self, query, paramsNone): 查询结果最多只包含一条记录 示例查询获取获取id为2的记录 :query SELECT * FROM test WHERE id %(id)s :param {id: 2} 当然也可以把参数放到query中 SELECT * FROM test WHERE id 2 result [] try: db_cursor self.dbconn.cursor() except Exception: logger.error(获取数据库游标失败%s正在尝试重新连接数据库 % traceback.format_exc()) self.__connect_database() db_cursor self.dbconn.cursor() try: db_cursor.execute(query, params) query_result db_cursor.fetchall() if query_result: result query_result[0] db_cursor.close() except Exception: db_cursor.close() raise Exception(f执行数据库查询操作({query})失败{traceback.format_exc()}) return result def select_many(self, query, paramsNone): 查询查询结果包含多条记录 try: db_cursor self.dbconn.cursor() except Exception: logger.error(获取数据库游标失败%s正在尝试重新连接数据库(%s) % (traceback.format_exc(), self.db_name)) self.__connect_database() db_cursor self.dbconn.cursor() try: db_cursor.execute(query, params) query_result db_cursor.fetchall() db_cursor.close() except Exception: db_cursor.close() raise Exception(f执行数据库查询操作({query})失败{traceback.format_exc()}) return query_result def close(self): if self.dbconn: self.dbconn.close() self.dbconn None def __del__(self): self.close() if __name__ __main__: from datetime import datetime db_cli ClickhouseCli(db_nametestdb, db_host192.168.88.131, port9000, usertestacc, passwordtest1234, connect_timeout15) db_cli.insert(INSERT INTO test (id, event_date, log_message) VALUES, [[1, datetime.now(), test message 1]]) db_cli.insert(INSERT INTO test (id, event_date, log_message) VALUES, [[2, datetime.now(), test message 2],[3, datetime.now(), test message 3]]) res db_cli.select_many(SELECT * FROM test) print(res) #输出[(1, datetime.date(2025, 9, 25), test message 1), (2, datetime.date(2025, 9, 25), test message 2), (3, datetime.date(2025, 9, 25), test message 3)] res db_cli.select(SELECT * FROM test WHERE id %(id)s, {id: 2}) print(res) # 输出(2, datetime.date(2025, 9, 25), test message 2) res db_cli.select(SELECT * FROM test WHERE id 2) print(res) db_cli.update(ALTER TABLE test UPDATE log_message %(log_message)s WHERE id %(id)s, {log_message:log_message %s % datetime.now().strftime(%Y%m%d%H%M%S), id: 2}, auto_optimizeTrue) res db_cli.select(SELECT * FROM test WHERE id 2) print(res) db_cli.update(ALTER TABLE test UPDATE log_message %s WHERE id 2 % (log_message %s % datetime.now().strftime(%Y%m%d%H%M%S))) res db_cli.select(SELECT * FROM test WHERE id 2) db_cli.delete(ALTER TABLE test DELETE WHERE id in (%(id)s, %(id2)s), {id: 2, id2: 3})注意误区当前驱动版本下验证使用类似以下代码尝试切换当前数据库至目标数据库test_db然后获取获取test_db数据库
实践环境python3 .9.13clickhouse-driver 0.2.9实践操作
发布时间:2026/7/1 1:45:24
# -*- coding:utf-8 -*- import clickhouse_driver if __name__ __main__: host 192.168.88.131 port 9000 # 注意不能使用默认的8123 username testacc password test1234 database default # 连接方式1 # conn clickhouse_driver.connect(database database, # user username, # passwordpassword, # host host, # port port) # 连接方式2 connection_str fclickhouse://{username}:{password}{host}:{port}/{database} conn clickhouse_driver.connect(connection_str) cursor conn.cursor() cursor.execute(SHOW TABLES) res cursor.fetchall() print(res) # 输出形如 [(table1,), (test,)] # 删除表 cursor.execute(DROP TABLE IF EXISTS test) print(cursor.fetchall()) # 输出:[] cursor.execute(CREATE TABLE test (x Int32) ENGINE Memory) print(cursor.fetchall()) # 输出:[] cursor.executemany(INSERT INTO test (x) VALUES, [{x: 100}]) print(cursor.rowcount) # 获取execute 产生记录数 输出1 # 插入多条 cursor.executemany(INSERT INTO test (x) VALUES, [[300],[400]]) cursor.executemany(INSERT INTO test (x) VALUES, [[200]]) print(cursor.rowcount) # 输出1 cursor.execute(INSERT INTO test (x) SELECT * FROM system.numbers LIMIT %(limit)s, {limit: 3}) print(cursor.rowcount) # 输出1 cursor.execute(SELECT sum(x) AS sum_value FROM test) print(cursor.rowcount) # 输出1 print(cursor.columns_with_types) # 获取查询列名及类型输出[(sum_value, Int64)] cursor.execute(SELECT * FROM test) print(cursor.rowcount) # 输出5 print(cursor.columns_with_types) # 输出[(x, Int32)] res cursor.fetchall() print(res) # 输出[(100,), (200,), (0,), (1,), (2,)] print(cursor.fetchone()) #输出None ############################# cursor.execute(SELECT * FROM test) print(cursor.fetchone()) # 输出(100,) # 仅取3条 print(cursor.fetchmany(3)) # 输出[(200,), (0,), (1,)] ############################# cursor.execute(SELECT * FROM test) print(cursor.fetchone()) # 输出(100,) print(cursor.fetchall()) # 输出[(200,), (0,), (1,), (2,)] cursor.close() conn.close()说明conn clickhouse_driver.connect(connection_str)connection_strclickhouse://[{username}:{password}]{host}[:{port}][/{database}]其中{database}默认为default类库封装# -*- coding:utf-8 -*- import re import traceback import clickhouse_driver from utils.log import logger class ClickhouseCli: def __init__(self, db_name, db_host, port3306, user, password, connect_timeout15): try: self.dbconn None self.host db_host self.port port self.user user self.passwd password self.db_name db_name self.connect_timeout connect_timeout self.connect_config {host: self.host, port: self.port, user: self.user, password: self.passwd, database: self.db_name, connect_timeout: self.connect_timeout} self.__connect_database() logger.debug(初始化数据库连接成功(数据库%s) % self.db_name) except Exception as e: raise Exception(初始化数据库(%s)连接失败%s % (self.db_name, traceback.format_exc())) def __connect_database(self): self.dbconn clickhouse_driver.connect(**self.connect_config) def insert(self, query, paramsNone): 插入单条数据 示例 :query INSERT INTO test (x) VALUES :params [{x: 100}] :query INSERT INTO test (x) VALUES :params [[100]] :query INSERT INTO test (x) SELECT * FROM system.numbers LIMIT %(limit)s :params {limit: 3} try: db_cursor self.dbconn.cursor() except Exception: logger.error(获取数据库游标失败%s正在尝试重新连接数据库 % traceback.format_exc()) self.__connect_database() db_cursor self.dbconn.cursor() try: db_cursor.execute(query, params) db_cursor.close() except Exception: db_cursor.close() raise Exception(f执行数据库插入操作({query})失败{traceback.format_exc()}) def delete(self, query, paramsNone): 例子 :query ALTER TABLE test DELETE WHERE id %(id)s :params [{id: 1}] 当然也可以把参数放到query中 ALTER TABLE test DELETE WHERE id 2 try: db_cursor self.dbconn.cursor() except Exception: logger.error(获取数据库游标失败%s正在尝试重新连接数据库 % traceback.format_exc()) self.__connect_database() db_cursor self.dbconn.cursor() try: db_cursor.execute(query, params) db_cursor.close() except Exception: db_cursor.close() raise Exception(f执行数据库删除操作({query})失败{traceback.format_exc()}) def update(self, query, paramsNone, auto_optimizeFalse): :auto_optimize 是否自动优化表 True - 是 False - 否 例子 :query ALTER TABLE test UPDATE log_message%(log_message)s WHERE id %(id)s :params {log_message:log message, id: 2} 当然也可以把参数放到query中 ALTER TABLE test UPDATE log_messagelog message WHERE id 2 try: db_cursor self.dbconn.cursor() except Exception: logger.error(获取数据库游标失败%s正在尝试重新连接数据库 % traceback.format_exc()) self.__connect_database() db_cursor self.dbconn.cursor() try: db_cursor.execute(query, params) if auto_optimize: res re.findall(ralter\stable\s(.)\supdate, query, re.I) if len(res) 0: table_name res[0] db_cursor.execute(fOPTIMIZE TABLE {table_name} FINAL) db_cursor.close() except Exception: db_cursor.close() raise Exception(f执行数据库更新操作({query})失败{traceback.format_exc()}) def select(self, query, paramsNone): 查询结果最多只包含一条记录 示例查询获取获取id为2的记录 :query SELECT * FROM test WHERE id %(id)s :param {id: 2} 当然也可以把参数放到query中 SELECT * FROM test WHERE id 2 result [] try: db_cursor self.dbconn.cursor() except Exception: logger.error(获取数据库游标失败%s正在尝试重新连接数据库 % traceback.format_exc()) self.__connect_database() db_cursor self.dbconn.cursor() try: db_cursor.execute(query, params) query_result db_cursor.fetchall() if query_result: result query_result[0] db_cursor.close() except Exception: db_cursor.close() raise Exception(f执行数据库查询操作({query})失败{traceback.format_exc()}) return result def select_many(self, query, paramsNone): 查询查询结果包含多条记录 try: db_cursor self.dbconn.cursor() except Exception: logger.error(获取数据库游标失败%s正在尝试重新连接数据库(%s) % (traceback.format_exc(), self.db_name)) self.__connect_database() db_cursor self.dbconn.cursor() try: db_cursor.execute(query, params) query_result db_cursor.fetchall() db_cursor.close() except Exception: db_cursor.close() raise Exception(f执行数据库查询操作({query})失败{traceback.format_exc()}) return query_result def close(self): if self.dbconn: self.dbconn.close() self.dbconn None def __del__(self): self.close() if __name__ __main__: from datetime import datetime db_cli ClickhouseCli(db_nametestdb, db_host192.168.88.131, port9000, usertestacc, passwordtest1234, connect_timeout15) db_cli.insert(INSERT INTO test (id, event_date, log_message) VALUES, [[1, datetime.now(), test message 1]]) db_cli.insert(INSERT INTO test (id, event_date, log_message) VALUES, [[2, datetime.now(), test message 2],[3, datetime.now(), test message 3]]) res db_cli.select_many(SELECT * FROM test) print(res) #输出[(1, datetime.date(2025, 9, 25), test message 1), (2, datetime.date(2025, 9, 25), test message 2), (3, datetime.date(2025, 9, 25), test message 3)] res db_cli.select(SELECT * FROM test WHERE id %(id)s, {id: 2}) print(res) # 输出(2, datetime.date(2025, 9, 25), test message 2) res db_cli.select(SELECT * FROM test WHERE id 2) print(res) db_cli.update(ALTER TABLE test UPDATE log_message %(log_message)s WHERE id %(id)s, {log_message:log_message %s % datetime.now().strftime(%Y%m%d%H%M%S), id: 2}, auto_optimizeTrue) res db_cli.select(SELECT * FROM test WHERE id 2) print(res) db_cli.update(ALTER TABLE test UPDATE log_message %s WHERE id 2 % (log_message %s % datetime.now().strftime(%Y%m%d%H%M%S))) res db_cli.select(SELECT * FROM test WHERE id 2) db_cli.delete(ALTER TABLE test DELETE WHERE id in (%(id)s, %(id2)s), {id: 2, id2: 3})注意误区当前驱动版本下验证使用类似以下代码尝试切换当前数据库至目标数据库test_db然后获取获取test_db数据库