怎么让spark sql写mysql时支持update操作

怎么让spark sql写mysql时支持update操作

这篇“怎么让sparksql写mysql时支持update操作”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“怎么让sparksql写mysql时支持update操作”文章吧。

除了支持:Append、Overwrite、ErrorIfExists、Ignore;还要在支持update操作

1、首先了解背景

spark提供了一个枚举类,用来支撑对接数据源的操作模式

通过源码查看,很明显,spark是不支持update操作的

2、如何让sparkSQL支持update

关键的知识点就是:

我们正常在sparkSQL写数据到mysql的时候:

大概的api是:

dataframe.write.format("sql.execution.customDatasource.jdbc").option("jdbc.driver","com.mysql.jdbc.Driver").option("jdbc.url","jdbc:mysql://localhost:3306/test?user=root&password=&useUnicode=true&characterEncoding=gbk&autoReconnect=true&failOverReadOnly=false").option("jdbc.db","test").save()

那么在底层中,spark会通过JDBC方言JdbcDialect , 将我们要插入的数据翻译成:

insertintostudent(columns_1,columns_2,...)values(?,?,....)

那么通过方言解析出的sql语句就通过PrepareStatement的executeBatch(),将sql语句提交给mysql,然后数据插入;

那么上面的sql语句很明显,完全就是插入代码,并没有我们期望的 update操作,类似:

UPDATEtable_nameSETfield1=new-value1,field2=new-value2

但是mysql独家支持这样的sql语句:

INSERTINTOstudent(columns_1,columns_2)VALUES('第一个字段值','第二个字段值')ONDUPLICATEKEYUPDATEcolumns_1='呵呵哒',columns_2='哈哈哒';

大概的意思就是,如果数据不存在则插入,如果数据存在,则 执行update操作;

因此,我们的切入点就是,让sparkSQL内部对接JdbcDialect的时候,能够生成这种sql:

INSERTINTO表名称(columns_1,columns_2)VALUES('第一个字段值','第二个字段值')ONDUPLICATEKEYUPDATEcolumns_1='呵呵哒',columns_2='哈哈哒';

3、改造源码前,需要了解整体的代码设计和执行流程

首先是:

dataframe.write

调用write方法就是为了返回一个类:DataFrameWriter

主要是因为DataFrameWriter是sparksql对接外部数据源写入的入口携带类,下面这些内容是给DataFrameWriter注册的携带信息

然后在出发save()操作后,就开始将数据写入;

接下来看save()源码:

在上面的源码里面主要是注册DataSource实例,然后使用DataSource的write方法进行数据写入

实例化DataSource的时候:

defsave():Unit={assertNotBucketed("save")valdataSource=DataSource(df.sparkSession,className=source,//自定义数据源的包路径partitionColumns=partitioningColumns.getOrElse(Nil),//分区字段bucketSpec=getBucketSpec,//分桶(用于hive)options=extraOptions.toMap)//传入的注册信息//mode:插入数据方式SaveMode,df:要插入的数据dataSource.write(mode,df)}

然后就是dataSource.write(mode, df)的细节,整段的逻辑就是:

根据providingClass.newInstance()去做模式匹配,然后匹配到哪里,就执行哪里的代码;

然后看下providingClass是什么:

拿到包路径.DefaultSource之后,程序进入:

那么如果是数据库作为写入目标的话,就会走:dataSource.createRelation,直接跟进源码:

很明显是个特质,因此哪里实现了特质,程序就会走到哪里了;

实现这个特质的地方就是:包路径.DefaultSource , 然后就在这里面去实现数据的插入和update的支持操作;

4、改造源码

根据代码的流程,最终sparkSQL 将数据写入mysql的操作,会进入:包路径.DefaultSource这个类里面;

也就是说,在这个类里面既要支持spark的正常插入操作(SaveMode),还要在支持update;

如果让sparksql支持update操作,最关键的就是做一个判断,比如:

if(isUpdate){sql语句:INSERTINTOstudent(columns_1,columns_2)VALUES('第一个字段值','第二个字段值')ONDUPLICATEKEYUPDATEcolumns_1='呵呵哒',columns_2='哈哈哒';}else{insertintostudent(columns_1,columns_2,...)values(?,?,....)}

但是,在spark生产sql语句的源码中,是这样写的:

没有任何的判断逻辑,就是最后生成一个:

INSERTINTOTABLE(字段1,字段2....)VALUES(?,?...)

所以首要的任务就是 ,怎么能让当前代码支持:ON DUPLICATE KEY UPDATE

可以做个大胆的设计,就是在insertStatement这个方法中做个如下的判断

definsertStatement(conn:Connection,savemode:CustomSaveMode,table:String,rddSchema:StructType,dialect:JdbcDialect):PreparedStatement={valcolumns=rddSchema.fields.map(x=>dialect.quoteIdentifier(x.name)).mkString(",")valplaceholders=rddSchema.fields.map(_=>"?").mkString(",")if(savemode==CustomSaveMode.update){//TODO如果是update,就组装成ONDUPLICATEKEYUPDATE的模式处理s"INSERTINTO$table($columns)VALUES($placeholders)ONDUPLICATEKEYUPDATE$duplicateSetting"}esle{valsql=s"INSERTINTO$table($columns)VALUES($placeholders)"conn.prepareStatement(sql)}}

这样,在用户传递进来的savemode模式,我们进行校验,如果是update操作,就返回对应的sql语句!

所以按照上面的逻辑,我们代码这样写:

这样我们就拿到了对应的sql语句;

但是只有这个sql语句还是不行的,因为在spark中会执行jdbc的prepareStatement操作,这里面会涉及到游标。

即jdbc在遍历这个sql的时候,源码会这样做:

看下makeSetter:

所谓有坑就是:

insertintotable(字段1,字段2,字段3)values(?,?,?)

那么当前在源码中返回的数组长度应该是3:

valsetters:Array[JDBCValueSetter]=rddSchema.fields.map(_.dataType).map(makeSetter(conn,dialect,_)).toArray

但是如果我们此时支持了update操作,既:

insertintotable(字段1,字段2,字段3)values(?,?,?)ONDUPLICATEKEYUPDATE字段1=?,字段2=?,字段3=?;

那么很明显,上面的sql语句提供了6个? , 但在规定字段长度的时候只有3

这样的话,后面的update操作就无法执行,程序报错!

所以我们需要有一个 识别机制,既:

if(isupdate){valnumFields=rddSchema.fields.length*2}else{valnumFields=rddSchema.fields.length}

row[1,2,3] setter(0,1) //index of setter , index of row setter(1,2) setter(2,3) setter(3,1) setter(4,2) setter(5,3)

所以在prepareStatment中的占位符应该是row的两倍,而且应该是类似这样的一个逻辑

因此,代码改造前样子:

改造后的样子:

try{if(supportsTransactions){conn.setAutoCommit(false)//Everythinginthesamedbtransaction.conn.setTransactionIsolation(finalIsolationLevel)}//valstmt=insertStatement(conn,table,rddSchema,dialect)//此处采用最新自己的sql语句,封装成prepareStatementvalstmt=conn.prepareStatement(sqlStmt)println(sqlStmt)/***在mysql中有这样的操作:*INSERTINTOuser_admin_t(_id,password)VALUES('1','第一次插入的密码')*INSERTINTOuser_admin_t(_id,password)VALUES('1','第一次插入的密码')ONDUPLICATEKEYUPDATE_id='UpId',password='upPassword';*如果是下面的ONDUPLICATEKEY操作,那么在prepareStatement中的游标会扩增一倍*并且如果没有update操作,那么他的游标是从0开始计数的*如果是update操作,要算上之前的insert操作**///makeSetter也要适配update操作,即游标问题valisUpdate=saveMode==CustomSaveMode.Updatevalsetters:Array[JDBCValueSetter]=isUpdatematch{casetrue=>valsetters:Array[JDBCValueSetter]=rddSchema.fields.map(_.dataType).map(makeSetter(conn,dialect,_)).toArrayArray.fill(2)(setters).flattencase_=>rddSchema.fields.map(_.dataType)valnumFieldsLength=rddSchema.fields.lengthvalnumFields=isUpdatematch{casetrue=>numFieldsLength*2case_=>numFieldsLengthvalcursorBegin=numFields/2try{varrowCount=0while(iterator.hasNext){valrow=iterator.next()vari=0while(i<numFields){if(isUpdate){//需要判断当前游标是否走到了ONDUPLICATEKEYUPDATEi<cursorBeginmatch{//说明还没走到update阶段casetrue=>//row.isNullAt判空,则设置空值if(row.isNullAt(i)){stmt.setNull(i+1,nullTypes(i))}else{setters(i).apply(stmt,row,i,0)}//说明走到了update阶段casefalse=>if(row.isNullAt(i-cursorBegin)){//pos-offsetstmt.setNull(i+1,nullTypes(i-cursorBegin))setters(i).apply(stmt,row,i,cursorBegin)}}else{if(row.isNullAt(i)){stmt.setNull(i+1,nullTypes(i))}else{setters(i).apply(stmt,row,i,0)}//滚动游标i=i+1}stmt.addBatch()rowCount+=1if(rowCount%batchSize==0){stmt.executeBatch()rowCount=0}if(rowCount>0){stmt.executeBatch()}finally{stmt.close()conn.commit()committed=trueIterator.empty}catch{casee:SQLException=>valcause=e.getNextExceptionif(cause!=null&&e.getCause!=cause){if(e.getCause==null){e.initCause(cause)}else{e.addSuppressed(cause)throwe}finally{if(!committed){//Thestagemustfail.Wegotherethroughanexceptionpath,so//lettheexceptionthroughunlessrollback()orclose()wantto//telltheuseraboutanotherproblem.if(supportsTransactions){conn.rollback()conn.close()}else{//Thestagemustsucceed.Wecannotpropagateanyexceptionclose()mightthrow.try{conn.close()}catch{casee:Exception=>logWarning("Transactionsucceeded,butclosingfailed",e)

//A`JDBCValueSetter`isresponsibleforsettingavaluefrom`Row`intoafieldfor//`PreparedStatement`.Thelastargument`Int`meanstheindexforthevaluetobeset//intheSQLstatementandalsousedforthevaluein`Row`.//PreparedStatement,Row,position,cursorprivatetypeJDBCValueSetter=(PreparedStatement,Row,Int,Int)=>UnitprivatedefmakeSetter(conn:Connection,dialect:JdbcDialect,dataType:DataType):JDBCValueSetter=dataTypematch{caseIntegerType=>(stmt:PreparedStatement,row:Row,pos:Int,cursor:Int)=>stmt.setInt(pos+1,row.getInt(pos-cursor))caseLongType=>stmt.setLong(pos+1,row.getLong(pos-cursor))caseDoubleType=>stmt.setDouble(pos+1,row.getDouble(pos-cursor))caseFloatType=>stmt.setFloat(pos+1,row.getFloat(pos-cursor))caseShortType=>stmt.setInt(pos+1,row.getShort(pos-cursor))caseByteType=>stmt.setInt(pos+1,row.getByte(pos-cursor))caseBooleanType=>stmt.setBoolean(pos+1,row.getBoolean(pos-cursor))caseStringType=>//println(row.getString(pos))stmt.setString(pos+1,row.getString(pos-cursor))caseBinaryType=>stmt.setBytes(pos+1,row.getAs[Array[Byte]](pos-cursor))caseTimestampType=>stmt.setTimestamp(pos+1,row.getAs[java.sql.Timestamp](pos-cursor))caseDateType=>stmt.setDate(pos+1,row.getAs[java.sql.Date](pos-cursor))caset:DecimalType=>stmt.setBigDecimal(pos+1,row.getDecimal(pos-cursor))caseArrayType(et,_)=>//removetypelengthparametersfromendoftypenamevaltypeName=getJdbcType(et,dialect).databaseTypeDefinition.toLowerCase.split("\\(")(0)valarray=conn.createArrayOf(typeName,row.getSeq[AnyRef](pos-cursor).toArray)stmt.setArray(pos+1,array)case_=>(_:PreparedStatement,_:Row,pos:Int,cursor:Int)=>thrownewIllegalArgumentException(s"Can'ttranslatenon-nullvalueforfield$pos")}

以上就是关于“怎么让sparksql写mysql时支持update操作”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注恰卡编程网行业资讯频道。

发布于 2022-02-15 20:39:21
收藏
分享
海报
0 条评论
34
上一篇:怎么用python绘制精美地图海报 下一篇:javascript面向对象编程的知识点有哪些
目录

    0 条评论

    本站已关闭游客评论,请登录或者注册后再评论吧~

    忘记密码?

    图形验证码