kafka处理器中的回滚机制api?

Roll back mechanism in kafka processor api?

I am using kafka processor api (not DSL)

public class StreamProcessor implements Processor<String, String> 
{

    public ProcessorContext context;

    public void init(ProcessorContext context) 
    {
        this.context = context;
        context.commit()
        //statestore initialized with key,value
    }

    public void process(String key, String val)
    {
        try
        {
            String[] topicList = stateStore.get(key).split("|"); 
            for(String topic: topicList) 
            {
                    context.forward(key,val,To.child(consumerTopic)); 
            } // forward same message to list of topics ( 1..n topics) , rollback if write to some topics failed ? 
        }
    }
}

Scenario : we are reading data from a source topic and stream processor writes data to multiple sink topics (topicList above) .

Question: How to implement rollback mechanism using kafka streams processor api when one or more of the topics in the topicList above fails to receive the message ? .

What I understand is processor api has rollback mechanism for each record it failed to send, or can roll back for an an entire batch of messages which failed be achieved as well? as process method in processor interface is called per record rather than per batch hence I would surmise it can only be done per record.Is this correct assumption ?, if not please suggest how to achieve per record and per batch rollbacks for failed topics using processor api.

您需要自己实施。例如,您可以使用两个存储:主存储和“缓冲区”存储,首先只更新缓冲存储,然后调用 context.forward() 以确保所有写入都在输出主题中,然后合并“缓冲区” " store into the main store.

如果需要回滚,请从缓冲区存储中删除内容。