sql 服务器如何与 mule 合并?

how do a merge of sql server with mule?

我已经用 foreach 和 execute dll 操作完成了这个,但是当尝试插入 o uptdate 时,字符串会抛出异常,但是使用 int 值可以。

异常:`异常堆栈是:

1. Invalid column name 'Carlos'. (com.microsoft.sqlserver.jdbc.SQLServerException)
  com.microsoft.sqlserver.jdbc.SQLServerException:217 (null)
2. Invalid column name 'Carlos'. (com.microsoft.sqlserver.jdbc.SQLServerException). Message payload is of type: LinkedHashMap (org.mule.api.MessagingException)
  org.mule.module.db.internal.processor.AbstractDbMessageProcessor:93 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html)
********************************************************************************
Root Exception stack trace:
com.microsoft.sqlserver.jdbc.SQLServerException: Invalid column name 'Carlos'.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1635)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:865)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:762)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:6276)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:1793)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:184)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:159)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeUpdate(SQLServerStatement.java:2198)
    at org.mule.module.db.internal.domain.autogeneratedkey.NoAutoGeneratedKeyStrategy.executeUpdate(NoAutoGeneratedKeyStrategy.java:59)
    at org.mule.module.db.internal.domain.executor.UpdateExecutor.doExecuteQuery(UpdateExecutor.java:43)
    at org.mule.module.db.internal.domain.executor.UpdateExecutor.doExecuteQuery(UpdateExecutor.java:37)
    at org.mule.module.db.internal.domain.executor.AbstractSingleQueryExecutor.execute(AbstractSingleQueryExecutor.java:38)
    at org.mule.module.db.internal.processor.ExecuteDdlMessageProcessor.doExecuteQuery(ExecuteDdlMessageProcessor.java:53)
    at org.mule.module.db.internal.processor.AbstractSingleQueryDbMessageProcessor.executeQuery(AbstractSingleQueryDbMessageProcessor.java:42)
    at org.mule.module.db.internal.processor.AbstractDbMessageProcessor.process(AbstractDbMessageProcessor.java:66)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.processor.BlockingProcessorExecutor.executeNext(BlockingProcessorExecutor.java:98)
    at org.mule.processor.BlockingProcessorExecutor.execute(BlockingProcessorExecutor.java:59)
    at org.mule.routing.outbound.AbstractMessageSequenceSplitter.processParts(AbstractMessageSequenceSplitter.java:129)
    at org.mule.routing.outbound.AbstractMessageSequenceSplitter.process(AbstractMessageSequenceSplitter.java:59)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.processor.BlockingProcessorExecutor.executeNext(BlockingProcessorExecutor.java:88)
    at org.mule.processor.BlockingProcessorExecutor.execute(BlockingProcessorExecutor.java:59)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.routing.Foreach.process(Foreach.java:94)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:107)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.processor.BlockingProcessorExecutor.executeNext(BlockingProcessorExecutor.java:88)
    at org.mule.processor.BlockingProcessorExecutor.execute(BlockingProcessorExecutor.java:59)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24)
    at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44)
    at org.mule.processor.BlockingProcessorExecutor.executeNext(BlockingProcessorExecutor.java:98)
    at org.mule.processor.BlockingProcessorExecutor.execute(BlockingProcessorExecutor.java:59)
    at org.mule.interceptor.AbstractEnvelopeInterceptor.processBlocking(AbstractEnvelopeInterceptor.java:58)
    at org.mule.processor.AbstractRequestResponseMessageProcessor.process(AbstractRequestResponseMessageProcessor.java:47)
    at org.mule.processor.AsyncInterceptingMessageProcessor.processNextTimed(AsyncInterceptingMessageProcessor.java:123)
    at o...
********************************************************************************
`

'Carlos' 是有效负载中的一个值。

这是 DLL

BEGIN MERGE [FP].[Profile] AS DESTINO USING (SELECT #[payload.ID]) AS
FUENTE (ID) ON (DESTINO.ID=FUENTE.ID) WHEN MATCHED THEN UPDATE SET 
[UserId]= #[payload.UserId] WHEN NOT MATCHED THEN INSERT 
(ID,UserId,ExternalId,FirstName) VALUES (#[payload.ID],#
[payload.UserId],#[payload.ExternalId],#[payload.FirstName]); END

XML流:

<flow name="insertprofilesindwFlow">
    <file:inbound-endpoint path="C:\Users\LBonaventura\Documents\MuleFilesFromDB" responseTimeout="10000" doc:name="Read the file with the profiles" moveToDirectory="C:\Users\LBonaventura\Documents\MuleFilesFromDB" pollingFrequency="864000000"/>
     <dw:transform-message metadata:id="57e39ab0-6c79-4118-a57e-fe6561a04e9b" doc:name="Transform Message  to map">
        <dw:input-payload doc:sample="list_csv_1.csv"/>
        <dw:set-payload><![CDATA[%dw 1.0 %output application/java ---payload map ((payload01 , indexOfPayload01) -> {


ID: payload01.ID,

ExternalId: payload01.ID as :string,
UserId: payload01.UserId,

FirstName: payload01.FirstName,


LastName: payload01.LastName,



(DateOfBirth: payload01.DateOfBirth as :string) when payload01.DateOfBirth != "",


(DateOfBirth: null) when payload01.DateOfBirth=="",



(Gender:"M") when  payload01.Gender=="1",


(Gender:"F") when  payload01.Gender=="0",


(Gender:null) when payload01.Gender=="",




AllowTracking: payload01.AllowTracking,


Email: payload01.AlertEmail, 

MainProfile: payload01.Main,


Active: payload01.Active as :boolean,
CreatedOn: payload01.DateAdded as :string,
UpdatedOn: payload01.DateUpdated as :string,


LanguageIso: "ESP",


Deleted: false})]]></dw:set-payload>
    </dw:transform-message>
    <foreach doc:name="For Each Profile">
        <db:execute-ddl config-ref="FOX_DW_DATABASE_CONFIGURATION" doc:name="Upsert in the Database">
            <db:dynamic-query><![CDATA[BEGIN MERGE [FP].[Profile] AS DESTINO
USING (SELECT #[payload.ID]) AS FUENTE (ID)
ON (DESTINO.ID=FUENTE.ID)
WHEN MATCHED THEN
    UPDATE SET 

        [UserId]= #[payload.UserId],
        [FirstName]=#[payload.FirstName]


WHEN NOT MATCHED THEN

    INSERT 
        (ID,UserId,ExternalId,[FirstName])
    VALUES
       (#[payload.ID],#[payload.UserId],#[payload.ExternalId],#[payload.FirstName]); END]]></db:dynamic-query>
        </db:execute-ddl>
    </foreach>

</flow>

使用动态查询时,没有参数转换,因此您需要为字符串参数添加引号,因此在您的情况下,它应该类似于:

<db:dynamic-query><![CDATA[BEGIN MERGE [FP].[Profile] AS DESTINO
USING (SELECT #[payload.ID]) AS FUENTE (ID)
ON (DESTINO.ID=FUENTE.ID)
WHEN MATCHED THEN
    UPDATE SET 
        [UserId]= '#[payload.UserId]',
        [FirstName]=#[payload.FirstName]
WHEN NOT MATCHED THEN
    INSERT 
        (ID,UserId,ExternalId,[FirstName])
    VALUES
       (#[payload.ID],'#[payload.UserId]',#[payload.ExternalId],'#[payload.FirstName]'); END]]></db:dynamic-query>

安全注意事项:使用动态查询时请注意,您很容易受到这种方式的SQL注入,因此请确保在将此参数传递给 DLL 之前对其进行清理。