怎么让spark sql写mysql时支持update操作
怎么让spark sql写mysql时支持update操作
这篇“怎么让sparksql写mysql时支持update操作”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“怎么让sparksql写mysql时支持update操作”文章吧。
除了支持:Append、Overwrite、ErrorIfExists、Ignore;还要在支持update操作
1、首先了解背景
spark提供了一个枚举类,用来支撑对接数据源的操作模式
通过源码查看,很明显,spark是不支持update操作的
2、如何让sparkSQL支持update
关键的知识点就是:
我们正常在sparkSQL写数据到mysql的时候:
大概的api是:
dataframe.write.format("sql.execution.customDatasource.jdbc").option("jdbc.driver","com.mysql.jdbc.Driver").option("jdbc.url","jdbc:mysql://localhost:3306/test?user=root&password=&useUnicode=true&characterEncoding=gbk&autoReconnect=true&failOverReadOnly=false").option("jdbc.db","test").save()
那么在底层中,spark会通过JDBC方言JdbcDialect , 将我们要插入的数据翻译成:
insertintostudent(columns_1,columns_2,...)values(?,?,....)
那么通过方言解析出的sql语句就通过PrepareStatement的executeBatch(),将sql语句提交给mysql,然后数据插入;
那么上面的sql语句很明显,完全就是插入代码,并没有我们期望的 update操作,类似:
UPDATEtable_nameSETfield1=new-value1,field2=new-value2
但是mysql独家支持这样的sql语句:
INSERTINTOstudent(columns_1,columns_2)VALUES('第一个字段值','第二个字段值')ONDUPLICATEKEYUPDATEcolumns_1='呵呵哒',columns_2='哈哈哒';
大概的意思就是,如果数据不存在则插入,如果数据存在,则 执行update操作;
因此,我们的切入点就是,让sparkSQL内部对接JdbcDialect的时候,能够生成这种sql:
INSERTINTO表名称(columns_1,columns_2)VALUES('第一个字段值','第二个字段值')ONDUPLICATEKEYUPDATEcolumns_1='呵呵哒',columns_2='哈哈哒';
3、改造源码前,需要了解整体的代码设计和执行流程
首先是:
dataframe.write
调用write方法就是为了返回一个类:DataFrameWriter
主要是因为DataFrameWriter是sparksql对接外部数据源写入的入口携带类,下面这些内容是给DataFrameWriter注册的携带信息
然后在出发save()操作后,就开始将数据写入;
接下来看save()源码:
在上面的源码里面主要是注册DataSource实例,然后使用DataSource的write方法进行数据写入
实例化DataSource的时候:
defsave():Unit={assertNotBucketed("save")valdataSource=DataSource(df.sparkSession,className=source,//自定义数据源的包路径partitionColumns=partitioningColumns.getOrElse(Nil),//分区字段bucketSpec=getBucketSpec,//分桶(用于hive)options=extraOptions.toMap)//传入的注册信息//mode:插入数据方式SaveMode,df:要插入的数据dataSource.write(mode,df)}
然后就是dataSource.write(mode, df)的细节,整段的逻辑就是:
根据providingClass.newInstance()去做模式匹配,然后匹配到哪里,就执行哪里的代码;
然后看下providingClass是什么:
拿到包路径.DefaultSource之后,程序进入:
那么如果是数据库作为写入目标的话,就会走:dataSource.createRelation,直接跟进源码:
很明显是个特质,因此哪里实现了特质,程序就会走到哪里了;
实现这个特质的地方就是:包路径.DefaultSource , 然后就在这里面去实现数据的插入和update的支持操作;
4、改造源码
根据代码的流程,最终sparkSQL 将数据写入mysql的操作,会进入:包路径.DefaultSource这个类里面;
也就是说,在这个类里面既要支持spark的正常插入操作(SaveMode),还要在支持update;
如果让sparksql支持update操作,最关键的就是做一个判断,比如:
if(isUpdate){sql语句:INSERTINTOstudent(columns_1,columns_2)VALUES('第一个字段值','第二个字段值')ONDUPLICATEKEYUPDATEcolumns_1='呵呵哒',columns_2='哈哈哒';}else{insertintostudent(columns_1,columns_2,...)values(?,?,....)}
但是,在spark生产sql语句的源码中,是这样写的:
没有任何的判断逻辑,就是最后生成一个:
INSERTINTOTABLE(字段1,字段2....)VALUES(?,?...)
所以首要的任务就是 ,怎么能让当前代码支持:ON DUPLICATE KEY UPDATE
可以做个大胆的设计,就是在insertStatement这个方法中做个如下的判断
definsertStatement(conn:Connection,savemode:CustomSaveMode,table:String,rddSchema:StructType,dialect:JdbcDialect):PreparedStatement={valcolumns=rddSchema.fields.map(x=>dialect.quoteIdentifier(x.name)).mkString(",")valplaceholders=rddSchema.fields.map(_=>"?").mkString(",")if(savemode==CustomSaveMode.update){//TODO如果是update,就组装成ONDUPLICATEKEYUPDATE的模式处理s"INSERTINTO$table($columns)VALUES($placeholders)ONDUPLICATEKEYUPDATE$duplicateSetting"}esle{valsql=s"INSERTINTO$table($columns)VALUES($placeholders)"conn.prepareStatement(sql)}}
这样,在用户传递进来的savemode模式,我们进行校验,如果是update操作,就返回对应的sql语句!
所以按照上面的逻辑,我们代码这样写:
这样我们就拿到了对应的sql语句;
但是只有这个sql语句还是不行的,因为在spark中会执行jdbc的prepareStatement操作,这里面会涉及到游标。
即jdbc在遍历这个sql的时候,源码会这样做:
看下makeSetter:
所谓有坑就是:
insertintotable(字段1,字段2,字段3)values(?,?,?)
那么当前在源码中返回的数组长度应该是3:
valsetters:Array[JDBCValueSetter]=rddSchema.fields.map(_.dataType).map(makeSetter(conn,dialect,_)).toArray
但是如果我们此时支持了update操作,既:
insertintotable(字段1,字段2,字段3)values(?,?,?)ONDUPLICATEKEYUPDATE字段1=?,字段2=?,字段3=?;
那么很明显,上面的sql语句提供了6个? , 但在规定字段长度的时候只有3
这样的话,后面的update操作就无法执行,程序报错!
所以我们需要有一个 识别机制,既:
if(isupdate){valnumFields=rddSchema.fields.length*2}else{valnumFields=rddSchema.fields.length}
row[1,2,3] setter(0,1) //index of setter , index of row setter(1,2) setter(2,3) setter(3,1) setter(4,2) setter(5,3)
所以在prepareStatment中的占位符应该是row的两倍,而且应该是类似这样的一个逻辑
因此,代码改造前样子:
改造后的样子:
try{if(supportsTransactions){conn.setAutoCommit(false)//Everythinginthesamedbtransaction.conn.setTransactionIsolation(finalIsolationLevel)}//valstmt=insertStatement(conn,table,rddSchema,dialect)//此处采用最新自己的sql语句,封装成prepareStatementvalstmt=conn.prepareStatement(sqlStmt)println(sqlStmt)/***在mysql中有这样的操作:*INSERTINTOuser_admin_t(_id,password)VALUES('1','第一次插入的密码')*INSERTINTOuser_admin_t(_id,password)VALUES('1','第一次插入的密码')ONDUPLICATEKEYUPDATE_id='UpId',password='upPassword';*如果是下面的ONDUPLICATEKEY操作,那么在prepareStatement中的游标会扩增一倍*并且如果没有update操作,那么他的游标是从0开始计数的*如果是update操作,要算上之前的insert操作**///makeSetter也要适配update操作,即游标问题valisUpdate=saveMode==CustomSaveMode.Updatevalsetters:Array[JDBCValueSetter]=isUpdatematch{casetrue=>valsetters:Array[JDBCValueSetter]=rddSchema.fields.map(_.dataType).map(makeSetter(conn,dialect,_)).toArrayArray.fill(2)(setters).flattencase_=>rddSchema.fields.map(_.dataType)valnumFieldsLength=rddSchema.fields.lengthvalnumFields=isUpdatematch{casetrue=>numFieldsLength*2case_=>numFieldsLengthvalcursorBegin=numFields/2try{varrowCount=0while(iterator.hasNext){valrow=iterator.next()vari=0while(i<numFields){if(isUpdate){//需要判断当前游标是否走到了ONDUPLICATEKEYUPDATEi<cursorBeginmatch{//说明还没走到update阶段casetrue=>//row.isNullAt判空,则设置空值if(row.isNullAt(i)){stmt.setNull(i+1,nullTypes(i))}else{setters(i).apply(stmt,row,i,0)}//说明走到了update阶段casefalse=>if(row.isNullAt(i-cursorBegin)){//pos-offsetstmt.setNull(i+1,nullTypes(i-cursorBegin))setters(i).apply(stmt,row,i,cursorBegin)}}else{if(row.isNullAt(i)){stmt.setNull(i+1,nullTypes(i))}else{setters(i).apply(stmt,row,i,0)}//滚动游标i=i+1}stmt.addBatch()rowCount+=1if(rowCount%batchSize==0){stmt.executeBatch()rowCount=0}if(rowCount>0){stmt.executeBatch()}finally{stmt.close()conn.commit()committed=trueIterator.empty}catch{casee:SQLException=>valcause=e.getNextExceptionif(cause!=null&&e.getCause!=cause){if(e.getCause==null){e.initCause(cause)}else{e.addSuppressed(cause)throwe}finally{if(!committed){//Thestagemustfail.Wegotherethroughanexceptionpath,so//lettheexceptionthroughunlessrollback()orclose()wantto//telltheuseraboutanotherproblem.if(supportsTransactions){conn.rollback()conn.close()}else{//Thestagemustsucceed.Wecannotpropagateanyexceptionclose()mightthrow.try{conn.close()}catch{casee:Exception=>logWarning("Transactionsucceeded,butclosingfailed",e)
//A`JDBCValueSetter`isresponsibleforsettingavaluefrom`Row`intoafieldfor//`PreparedStatement`.Thelastargument`Int`meanstheindexforthevaluetobeset//intheSQLstatementandalsousedforthevaluein`Row`.//PreparedStatement,Row,position,cursorprivatetypeJDBCValueSetter=(PreparedStatement,Row,Int,Int)=>UnitprivatedefmakeSetter(conn:Connection,dialect:JdbcDialect,dataType:DataType):JDBCValueSetter=dataTypematch{caseIntegerType=>(stmt:PreparedStatement,row:Row,pos:Int,cursor:Int)=>stmt.setInt(pos+1,row.getInt(pos-cursor))caseLongType=>stmt.setLong(pos+1,row.getLong(pos-cursor))caseDoubleType=>stmt.setDouble(pos+1,row.getDouble(pos-cursor))caseFloatType=>stmt.setFloat(pos+1,row.getFloat(pos-cursor))caseShortType=>stmt.setInt(pos+1,row.getShort(pos-cursor))caseByteType=>stmt.setInt(pos+1,row.getByte(pos-cursor))caseBooleanType=>stmt.setBoolean(pos+1,row.getBoolean(pos-cursor))caseStringType=>//println(row.getString(pos))stmt.setString(pos+1,row.getString(pos-cursor))caseBinaryType=>stmt.setBytes(pos+1,row.getAs[Array[Byte]](pos-cursor))caseTimestampType=>stmt.setTimestamp(pos+1,row.getAs[java.sql.Timestamp](pos-cursor))caseDateType=>stmt.setDate(pos+1,row.getAs[java.sql.Date](pos-cursor))caset:DecimalType=>stmt.setBigDecimal(pos+1,row.getDecimal(pos-cursor))caseArrayType(et,_)=>//removetypelengthparametersfromendoftypenamevaltypeName=getJdbcType(et,dialect).databaseTypeDefinition.toLowerCase.split("\\(")(0)valarray=conn.createArrayOf(typeName,row.getSeq[AnyRef](pos-cursor).toArray)stmt.setArray(pos+1,array)case_=>(_:PreparedStatement,_:Row,pos:Int,cursor:Int)=>thrownewIllegalArgumentException(s"Can'ttranslatenon-nullvalueforfield$pos")}
以上就是关于“怎么让sparksql写mysql时支持update操作”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注恰卡编程网行业资讯频道。
推荐阅读
-
navicat(for mysql 过期如何解决 Navicat for MySQL如何使用)
NavicatforMySQL如何使用?1.下载NavicatforMySQL软件后。2.在文件里找到navicat.Na...
-
pycharm(mysql 安装教程 学python这条路怎么走)
学python这条路怎么走?如何学习Python?这是很多新手都会问的问题。这时候问问自己,学Python到底想干什么?为了兴趣?...
-
MySQL索引怎么创建和删除
MySQL索引怎么创建和删除这篇文章主要介绍了MySQL索引怎么创...
-
MySQL查看锁的代码怎么写
MySQL查看锁的代码怎么写本文小编为大家详细介绍“MySQL查看...
-
在 PHP 7 中不要做的 10 件事
1.不要使用MySQL_函数这一天终于来了,从此你不仅仅“不应该”使用mysql_函数。PHP7已经把它们从核心...
-
MySQL体系架构,超详细
-
利用PHP访问MySql数据库以及增删改查实例操作
关于利用PHP访问MySql数据库的逻辑操作以及增删改查实例操作PHP访问MySql数据库˂?php//造连...
-
密码攻防系列文章6:服务器MySQL账号扫描及攻击
-
计算机毕业设计php创建mysql数据库
-
PHP动态网站设计试题