怎么在Java中利用多线程批量导入数据
作者
怎么在Java中利用多线程批量导入数据?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
java基本数据类型有哪些
Java的基本数据类型分为:1、整数类型,用来表示整数的数据类型。2、浮点类型,用来表示小数的数据类型。3、字符类型,字符类型的关键字是“char”。4、布尔类型,是表示逻辑值的基本数据类型。
前言:
当遇到大量数据导入时,为了提高处理的速度,可以选择使用多线程来批量处理这些处理。常见的场景有:
大文件导入数据库(这个文件不一定是标准的CSV可导入文件或者需要在内存中经过一定的处理)
数据同步(从第三方接口拉取数据处理后写入自己的数据库)
以上的场景有一个共性,这类数据导入的场景简单来说就是将数据从一个数据源移动到另外一个数据源,而其中必定可以分为两步
数据读取:从数据源读取数据到内存
数据写入:将内存中的数据写入到另外一个数据源,可能存在数据处理
而且根据读取的速度一般会比数据写入的速度快很多,即读取快,写入慢。
设计思路
由于场景的特点是读取快,写入慢,如果是使用多线程处理,建议是数据写入部分改造为多线程。而数据读取可以改造成批量读取数据。简单来说就是两个要点:
批量读取数据
多线程写入数据
示例
多线程批量处理最简单的方案是使用线程池来进行处理,下面会通过一个模拟批量读取和写入的服务,以及对这个服务的多线程写入调用作为示例,展示如何多线程批量数据导入。
模拟服务
importjava.util.concurrent.atomic.AtomicLong; /** *数据批量写入用的模拟服务 * *@authorRJH *createat2019-04-01 */ publicclassMockService{ /** *可读取总数 */ privatelongcanReadTotal; /** *写入总数 */ privateAtomicLongwriteTotal=newAtomicLong(0); /** *写入休眠时间(单位:毫秒) */ privatefinallongsleepTime; /** *构造方法 * *@paramcanReadTotal *@paramsleepTime */ publicMockService(longcanReadTotal,longsleepTime){ this.canReadTotal=canReadTotal; this.sleepTime=sleepTime; } /** *批量读取数据接口 * *@paramnum *@return */ publicsynchronizedlongreadData(intnum){ longreadNum; if(canReadTotal>=num){ canReadTotal-=num; readNum=num; }else{ readNum=canReadTotal; canReadTotal=0; } //System.out.println("readdatasize:"+readNum); returnreadNum; } /** *写入数据接口 */ publicvoidwriteData(){ try{ //休眠一定时间模拟写入速度慢 Thread.sleep(sleepTime); }catch(InterruptedExceptione){ e.printStackTrace(); } //写入总数自增 System.out.println("thread:"+Thread.currentThread()+"writedata:"+writeTotal.incrementAndGet()); } /** *获取写入的总数 * *@return */ publiclonggetWriteTotal(){ returnwriteTotal.get(); } }
批量数据处理器
importjava.util.concurrent.ExecutorService; importjava.util.concurrent.Executors; /** *基于线程池的多线程批量写入处理器 *@authorRJH *createat2019-04-01 */ publicclassSimpleBatchHandler{ privateExecutorServiceexecutorService; privateMockServiceservice; /** *每次批量读取的数据量 */ privateintbatch; /** *线程个数 */ privateintthreadNum; publicSimpleBatchHandler(MockServiceservice,intbatch,intthreadNum){ this.service=service; this.batch=batch; //使用固定数目的线程池 this.executorService=Executors.newFixedThreadPool(threadNum); } /** *开始处理 */ publicvoidstartHandle(){ //开始处理的时间 longstartTime=System.currentTimeMillis(); System.out.println("starthandletime:"+startTime); longreadData; while((readData=service.readData(batch))!=0){//批量读取数据,知道读取不到数据才停止 for(longi=0;i<readData;i++){ executorService.execute(()->service.writeData()); } } //关闭线程池 executorService.shutdown(); while(!executorService.isTerminated()){//等待线程池中的线程执行完 } //结束时间 longendTime=System.currentTimeMillis(); System.out.println("endhandletime:"+endTime); //总耗时 System.out.println("totalhandletime:"+(endTime-startTime)+"ms"); //写入总数 System.out.println("totalwritenum:"+service.getWriteTotal()); } }
测试类
publicclassSimpleBatchHandlerTest{ publicstaticvoidmain(String[]args){ //总数 longtotal=100000; //休眠时间 longsleepTime=100; //每次拉取的数量 intbatch=100; //线程个数 intthreadNum=16; MockServicemockService=newMockService(total,sleepTime); SimpleBatchHandlerhandler=newSimpleBatchHandler(mockService,batch,threadNum); handler.startHandle(); } }
运行结果
starthandletime:1554298681755 thread:Thread[pool-1-thread-2,5,main]writedata:1 thread:Thread[pool-1-thread-1,5,main]writedata:2 ...省略部分输出 thread:Thread[pool-1-thread-4,5,main]writedata:100000 endhandletime:1554299330202 totalhandletime:648447ms totalwritenum:100000
看完上述内容,你们掌握怎么在Java中利用多线程批量导入数据的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注恰卡编程网行业资讯频道,感谢各位的阅读!
目录
推荐阅读
0 条评论
本站已关闭游客评论,请登录或者注册后再评论吧~