clickhouse通过jdbc实现批量数据导入代码示例
通过mybatis也可以实现批量写入clickhouse,但是效率太低,每秒大概只能写入300条数据,对于动辄上千万的数据或者更多数据,效率就太低了。而采用jdbc驱动的方式进行批量写入,每秒大约写入数据量达到10000条,效率得到大幅度的提升,只是这种方式,需要编写的代码会更多,需要处理的细节也更多。
1. 添加依赖
<!--clickhouse--> <dependency> <groupId>com.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.3.2-patch8</version> </dependency>
2. 获取连接
public Connection getClickHouseConnection(){ String url = "jdbc:clickhouse://192.168.17.81:8123/default"; Properties props = new Properties(); props.setProperty("user", username); props.setProperty("password", "123456"); try { ClickHouseDataSource dataSource = new ClickHouseDataSource(url, props); Connection conn = dataSource.getConnection(); return conn; }catch (Exception e) { e.printStackTrace(); } return null; } // 关闭连接 public void closeClickHouseConnection(Connection connection) { if (connection != null) { try { connection.close(); } catch (SQLException e) { //ignore } } }
3. 准备预编译sql
private String buildBatchSql() { String baseSql = "INSERT INTO plat_data_center_bill(bill_id, bank_merchno, bank_sub_merchno, ...) "; baseSql = baseSql + "VALUES("; baseSql = baseSql + "?,?,?,?,?,?,?,?,?,?"; baseSql = baseSql + ",?,?,?,?,?,?,?,?,?,?"; baseSql = baseSql + ")"; return baseSql; }
注意这里的字段对应的值采用问号替换,一个字段就是一个问号。
4. 批量写入数据
private void batchInsertClickHouse(Connection connection, List<PlatDataCenterBill> dataList) { String batchSql = buildBatchSql(); PreparedStatement prepareStatement = null; try { connection.setAutoCommit(true); // 执行conn.setAutoCommit(false);会报Transactions are not supported异常 // 所以不能执行conn.commit(); // 只能执行pst.executeBatch();由clickhouse进行后台提交。测试时插入数据条数是正确的 prepareStatement = connection.prepareStatement(batchSql); for (PlatDataCenterBill bill : dataList) { prepareStatement.setLong(1, bill.getBillId()); setPrepareString(prepareStatement, 2, bill.getBankMerchno()); setPrepareString(prepareStatement,2, bill.getBankMerchno()); setPrepareString(prepareStatement,3, bill.getBankSubMerchno()); setPrepareString(prepareStatement,4, bill.getBankMerchname()); prepareStatement.setObject(5, bill.getBankDeptid()); setPrepareString(prepareStatement,6, bill.getBankDeptances()); setPrepareString(prepareStatement,7, bill.getBankDeptname()); setPrepareString(prepareStatement,8, bill.getAppId()); setPrepareString(prepareStatement,9, bill.getExternalMerchno()); setPrepareString(prepareStatement,10, bill.getExternalMerchname()); ... prepareStatement.addBatch(); } prepareStatement.executeBatch(); prepareStatement.clearBatch(); } catch (Exception e) { e.printStackTrace(); throw new ServiceException("批量写入异常"); } finally { if (prepareStatement != null) { try { prepareStatement.close(); } catch (SQLException e) { //ignore } } } }
这里需要注意的是需要对每个字段的值进行设置,即使字段值为空值,也需要设置,另外还需要注意,日期时间类型,需要使用Timestamp进行设置。
prepareStatement.setObject(48, new Timestamp(sqlCreateTime.getTime()));
5. 为了判断空值,使用统一的方法
public void setPrepareString(PreparedStatement pst, int idx, String value) throws SQLException { if(value != null) { pst.setString(idx, value); } else { pst.setString(idx, ""); } }
6. 相关的依赖
import com.clickhouse.jdbc.ClickHouseDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.sql.DataSource; import java.math.BigDecimal; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;
7. 循环批量写数据
Connection connection = getClickHouseConnection(); List<PlatDataCenterBill> platBills = pdcBillService.selectPlatDataCenterBillList(pdcbill); int iCount = 0; ArrayList<PlatDataCenterBill> billList = new ArrayList<PlatDataCenterBill>(); for(PlatDataCenterBill bill : platBills) { billList.add(bill); iCount = iCount + 1; if(iCount % 10000 == 0) { System.out.println(billList.size()); batchInsertClickHouse(connection, billList); billList.clear(); } } // 剩余部分数据批量写入 if(billList.size() > 0) { batchInsertClickHouse(connection, billList); billList.clear(); } // 这里关闭连接才是正确的 closeClickHouseConnection(connection);
以上就是批量导入数据的大致处理过程。