当前位置:首页 > 大数据 > ClickHouse > 正文内容

clickhouse通过jdbc实现批量数据导入代码示例

admin1年前 (2024-01-09)ClickHouse524

    通过mybatis也可以实现批量写入clickhouse,但是效率太低,每秒大概只能写入300条数据,对于动辄上千万的数据或者更多数据,效率就太低了。而采用jdbc驱动的方式进行批量写入,每秒大约写入数据量达到10000条,效率得到大幅度的提升,只是这种方式,需要编写的代码会更多,需要处理的细节也更多。

1. 添加依赖

<!--clickhouse-->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-patch8</version>
</dependency>

2. 获取连接

public Connection getClickHouseConnection(){
        
	String url = "jdbc:clickhouse://192.168.17.81:8123/default";
 
	Properties props = new Properties();
	props.setProperty("user", username);
	props.setProperty("password", "123456");	
 
	try {
		
		ClickHouseDataSource dataSource = new ClickHouseDataSource(url, props);
		Connection conn = dataSource.getConnection();
		return conn;
	}catch (Exception e)
	{
		e.printStackTrace();
	}
 
	return null;	
}
 
 
 
// 关闭连接
public void closeClickHouseConnection(Connection connection)
{
	if (connection != null) {
		try {
			connection.close();
		} catch (SQLException e) {
			//ignore
		}
	}
}

3. 准备预编译sql

private String buildBatchSql()
{
	String baseSql = "INSERT INTO plat_data_center_bill(bill_id, bank_merchno, bank_sub_merchno, ...) ";
	baseSql = baseSql + "VALUES(";
 
	baseSql = baseSql + "?,?,?,?,?,?,?,?,?,?";
	baseSql = baseSql + ",?,?,?,?,?,?,?,?,?,?";        
 
	baseSql = baseSql + ")";
 
	return baseSql;
}

注意这里的字段对应的值采用问号替换,一个字段就是一个问号。

4. 批量写入数据

private void batchInsertClickHouse(Connection connection, List<PlatDataCenterBill> dataList)
{        
	String batchSql = buildBatchSql();
 
	PreparedStatement prepareStatement = null;
	try {
		 connection.setAutoCommit(true);
		// 执行conn.setAutoCommit(false);会报Transactions are not supported异常
		// 所以不能执行conn.commit();
		// 只能执行pst.executeBatch();由clickhouse进行后台提交。测试时插入数据条数是正确的
		prepareStatement = connection.prepareStatement(batchSql);
		for (PlatDataCenterBill bill : dataList) {
			prepareStatement.setLong(1, bill.getBillId());
 
			setPrepareString(prepareStatement, 2, bill.getBankMerchno());
 
			setPrepareString(prepareStatement,2, bill.getBankMerchno());
			setPrepareString(prepareStatement,3, bill.getBankSubMerchno());
			setPrepareString(prepareStatement,4, bill.getBankMerchname());
			prepareStatement.setObject(5, bill.getBankDeptid());
 
			setPrepareString(prepareStatement,6, bill.getBankDeptances());
			setPrepareString(prepareStatement,7, bill.getBankDeptname());
			setPrepareString(prepareStatement,8, bill.getAppId());
			setPrepareString(prepareStatement,9, bill.getExternalMerchno());
			setPrepareString(prepareStatement,10, bill.getExternalMerchname());
 
			...		
 
			prepareStatement.addBatch();
		}
		prepareStatement.executeBatch();
		prepareStatement.clearBatch();
		
	} catch (Exception e) {
		e.printStackTrace();
		throw new ServiceException("批量写入异常");
	} finally {
		if (prepareStatement != null) {
			try {
				prepareStatement.close();
			} catch (SQLException e) {
				//ignore
			}
		}    
 
	}
}

这里需要注意的是需要对每个字段的值进行设置,即使字段值为空值,也需要设置,另外还需要注意,日期时间类型,需要使用Timestamp进行设置。

prepareStatement.setObject(48, new Timestamp(sqlCreateTime.getTime()));

 5. 为了判断空值,使用统一的方法

public void setPrepareString(PreparedStatement pst, int idx, String value) throws SQLException {
 
	if(value != null) {
		pst.setString(idx, value);
	}
	else  {
		pst.setString(idx, "");
	}
}

6. 相关的依赖

import com.clickhouse.jdbc.ClickHouseDataSource;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
 
import javax.sql.DataSource;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

7. 循环批量写数据

Connection connection = getClickHouseConnection();
 
 
List<PlatDataCenterBill> platBills = pdcBillService.selectPlatDataCenterBillList(pdcbill);
 
int iCount = 0;
 
ArrayList<PlatDataCenterBill> billList = new ArrayList<PlatDataCenterBill>();
 
for(PlatDataCenterBill bill : platBills)
{
	billList.add(bill);
	iCount = iCount + 1;
 
	if(iCount % 10000 == 0)
	{		
		System.out.println(billList.size());
		batchInsertClickHouse(connection, billList);
 
		billList.clear();
	}
}
 
// 剩余部分数据批量写入
if(billList.size() > 0)
{	
	batchInsertClickHouse(connection, billList);
	billList.clear();
}
 
 
// 这里关闭连接才是正确的
closeClickHouseConnection(connection);

以上就是批量导入数据的大致处理过程。

扫描二维码推送至手机访问。

版权声明:本文由大数据技术发布,如需转载请注明出处。

本文链接:http://mllib.cn/?id=7

分享给朋友:

“clickhouse通过jdbc实现批量数据导入代码示例” 的相关文章

clickhouse和mysql的 clickhouse mysql引擎

clickhouse和mysql的 clickhouse mysql引擎

1.概述clickhouse高级功能之MaterializeMySQL详解2. 介绍目前 MaterializeMySQL database engine 还不支持表级别的同步操作,需要将整个mysql database映射到clickhouse,映射过来的库表会自动创建为ReplacingMerg...

clickhouse创建角色和用户

clickhouse创建角色和用户

clickhouse版本:22.2.2.1更改default账户权限ClickHouse提供了一个default账号,这个账号有所有的权限,但是不能使用SQL驱动方式的访问权限和账户管理。我们需要在配置文件中修改default账户,使其能够通过SQL驱动方式添加角色和用户。默认default用户是没...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。