Apache Beam - 即使程序连续执行,也会捕获并抛出异常。如何停止该进程或在管道中处理

Apache Beam - Exception caught and throwed even though program excuting continuosly. How to stop that process or handle in pipeline

我有一个获取数据 mysql 并用于将数据传输到 mongo 数据库的管道 在 运行 使用以下代码的管道之后,数据从 mysql 获取但无法加载到 mongodb

noSqlresult.apply(MongoDbIO.write().withUri(mongoUri)
                .withDatabase(mongoDatabase)
                .withCollection(resultCollectionName));

我发现了以下异常和一些不断尝试与 mongo db

通信的日志
com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=null, userName='mongoUser', source='db1', password=<hidden>, mechanismProperties={}}
at com.mongodb.connection.SaslAuthenticator.wrapException(SaslAuthenticator.java:162)
at com.mongodb.connection.SaslAuthenticator.access0(SaslAuthenticator.java:39)
at com.mongodb.connection.SaslAuthenticator.run(SaslAuthenticator.java:68)
at com.mongodb.connection.SaslAuthenticator.run(SaslAuthenticator.java:46)
at com.mongodb.connection.SaslAuthenticator.doAsSubject(SaslAuthenticator.java:168)
at com.mongodb.connection.SaslAuthenticator.authenticate(SaslAuthenticator.java:46)
at com.mongodb.connection.DefaultAuthenticator.authenticate(DefaultAuthenticator.java:32)
at com.mongodb.connection.InternalStreamConnectionInitializer.authenticateAll(InternalStreamConnectionInitializer.java:122)
at com.mongodb.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:52)
at com.mongodb.connection.InternalStreamConnection.open(InternalStreamConnection.java:127)
at com.mongodb.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:114)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.mongodb.MongoCommandException: Command failed with error 18: 'Authentication failed.' on server severip:27017. The full response is { "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" }
    at com.mongodb.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:164)
    at com.mongodb.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:295)
    at com.mongodb.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:255)
    at com.mongodb.connection.CommandHelper.sendAndReceive(CommandHelper.java:84)
    at com.mongodb.connection.CommandHelper.executeCommand(CommandHelper.java:34)
    at com.mongodb.connection.SaslAuthenticator.sendSaslStart(SaslAuthenticator.java:119)
    at com.mongodb.connection.SaslAuthenticator.access[=11=]0(SaslAuthenticator.java:39)
    at com.mongodb.connection.SaslAuthenticator.run(SaslAuthenticator.java:52)
    ... 9 more
18/11/09 12:49:29 DEBUG org.mongodb.driver.cluster: Updating cluster description to  {type=UNKNOWN, servers=[{address=severip:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=null, userName='mongoUser', source='db1', password=<hidden>, mechanismProperties={}}}, caused by {com.mongodb.MongoCommandException: Command failed with error 18: 'Authentication failed.' on server severip:27017. The full response is { "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" }}}]
18/11/09 12:49:29 DEBUG org.mongodb.driver.connection: Closing connection connectionId{localValue:17}
18/11/09 12:49:29 DEBUG org.mongodb.driver.cluster: Updating cluster description to  {type=UNKNOWN, servers=[{address=severip:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=null, userName='mongoUser', source='db1', password=<hidden>, mechanismProperties={}}}, caused by {com.mongodb.MongoCommandException: Command failed with error 18: 'Authentication failed.' on server severip:27017. The full response is { "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" }}}]
18/11/09 12:49:29 DEBUG org.mongodb.driver.connection: Closing connection connectionId{localValue:18}

如何处理这种情况 mongo 可以创建数据库(如果不存在)但不能在内部创建 mongoIO 使用 mongo 客户端进行连接。有没有可能处理

显示它的错误是由于注意力问题,但现有数据库未获取异常,仅获取新数据库 确切原因是什么以及如何处理这些错误

没有对象来存储ptransform的结果,所以需要等待连接超时异常,之后如果声明其他连接自动关闭,你可以执行你在catch块中声明的操作。

如果你是处理异常的新手,我想你会从下面得到答案link