如何使用 Sawtooth Java SDK 创建通道事件?

How to create channel events using Sawtooth Java SDK?

HyperLeger Sawtooth 支持订阅交易处理器中的事件。但是,有没有一种方法可以在事务处理器中创建特定于应用程序的事件,例如此处的 Python 示例:https://www.jacklllll.xyz/blog/2019/04/08/sawtooth/

ctx.addEvent(
 'agreement/create',
 [['name', 'agreement'],
 ['address', address], 
 ['buyer name', agreement.BuyerName], 
 ['seller name', agreement.SellerName], 
 ['house id', agreement.HouseID], 
 ['creator', signer]],
  null)

在当前的 Sawtooth-Java SDK v0.1.2 中,唯一的覆盖是

apply(TpProcessRequest, State)

没有上下文。但是在此处的文档中:https://github.com/hyperledger/sawtooth-sdk-java/blob/master/sawtooth-sdk-transaction-processor/src/main/java/sawtooth/sdk/processor/TransactionHandler.java

addEvent(TpProcessRequest, Context)

到目前为止,我已经设法监听了事件 sawtooth/state-delta 但是这给了我那个 tx-family

的所有状态变化
import sawtooth.sdk.protobuf.EventSubscription;
import sawtooth.sdk.protobuf.EventFilter;
import sawtooth.sdk.protobuf.ClientEventsSubscribeRequest;
import sawtooth.sdk.protobuf.ClientEventsSubscribeResponse;
import sawtooth.sdk.protobuf.ClientEventsUnsubscribeRequest;
import sawtooth.sdk.protobuf.Message;

EventFilter filter = EventFilter.newBuilder()
                     .setKey("address")
                     .setMatchString(nameSpace.concat(".*"))
                     .setFilterType(EventFilter.FilterType.REGEX_ANY)
                     .build();

EventSubscription subscription = EventSubscription.newBuilder()
                      .setEventType("sawtooth/state-delta")
                      .addFilters(filter)
                      .build();

context = new ZContext();
socket = context.createSocket(ZMQ.DEALER);
socket.connect("tcp://sawtooth-rest:4004");

ClientEventsSubscribeRequest request = ClientEventsSubscribeRequest.newBuilder()
               .addSubscriptions(subscription)
               .build();

message = Message.newBuilder()
          .setCorrelationId("123")
           .setMessageType(Message.MessageType.CLIENT_EVENTS_SUBSCRIBE_REQUEST)
          .setContent(request.toByteString())
          .build();

socket.send(message.toByteArray());

注册 Message.MessageType.CLIENT_EVENTS_SUBSCRIBE_REQUEST 后,我会在线程循环中收到消息。

我希望在 TransactionHandler 中我应该能够 addEvent() 或创建某种类型的事件,然后可以使用 Java SDK 进行订阅。

有没有其他人尝试在 Sawtooth 上的 JAVA 中创建自定义事件?

这是在 Python 中添加的事件示例。 Java 类似。 您在交易处理器中添加 custom-named 事件:

context.add_event(event_type="cookiejar/bake", attributes=[("cookies-baked", amount)])

https://github.com/danintel/sawtooth-cookiejar/blob/master/pyprocessor/cookiejar_tp.py#L138

以下是用 Python 和 Go 编写的事件处理程序示例: https://github.com/danintel/sawtooth-cookiejar/tree/master/events Java 也类似。基本上事件处理程序中的逻辑是:

  1. 订阅您想收听的活动
  2. 向验证器发送请求
  3. 读取并解析订阅响应
  4. 在循环中,循环监听订阅的事件
  5. 退出循环后(如果有的话),取消订阅 rom 事件

对于那些试图将 java SDK 用于事件 publishing/subscribing 的用户 - 没有直接可用的 API。至少我找不到它,我正在使用 1.0 docker 图像。 因此,要发布您的事件,您需要直接发布到锯齿波 rest-api 服务器。需要注意以下事项:

  1. 您需要一个仅对每个请求有效的上下文 ID。您可以从 apply() 方法中的请求中获取此信息。 (下面的代码)。因此,请确保在事务发布期间发布事件,即在 apply() 方法
  2. 的实施期间
  3. 事件结构将如文档中所述here
  4. 如果交易成功并且块已提交,您将在事件订阅者中获得事件,否则它不会显示。
  5. 创建订阅者时您需要订阅

sawtooth/block-commit

活动并为您的活动类型添加额外的订阅,例如“myNS/my-event”

示例事件发布代码:

public void apply(TpProcessRequest request, State state) throws InvalidTransactionException, InternalError {

       ///process your trasaction first

        sawtooth.sdk.messaging.Stream eventStream = new Stream("tcp://localhost:4004"); // make this in the constructor of class NOT here
        List<Attribute> attrList = new ArrayList<>();
        Attribute attrs = Attribute.newBuilder().setKey("someKey").setValue("someValue").build();

        attrList.add(attrs);

        Event appEvent = Event.newBuilder().setEventType("myNS/my-event-type")
                .setData( <some ByteString here>).addAllAttributes(attrList).build();
        TpEventAddRequest addEventRequest = TpEventAddRequest.newBuilder()
                .setContextId(request.getContextId()).setEvent(appEvent).build();
        Future sawtoothSubsFuture = eventStream.send(MessageType.TP_EVENT_ADD_REQUEST, addEventRequest.toByteString());

        try {
            System.out.println(sawtoothSubsFuture.getResult());
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

然后您订阅这样的事件(灵感来自市场样本):

try {
            
            
            EventFilter eventFilter = EventFilter.newBuilder().setKey("address")
                    .setMatchString(String.format("^%s.*", "myNamespace"))
                    .setFilterType(FilterType.REGEX_ANY).build();

            //subscribe to sawtooth/block-commit
            EventSubscription deltaSubscription = EventSubscription.newBuilder().setEventType("sawtooth/block-commit")
                    .addFilters(eventFilter)
                    .build();

            EventSubscription mySubscription = EventSubscription.newBuilder().setEventType("myNS/my-event-type")
                    .build(); //no filters added for my events.
            
            ClientEventsSubscribeRequest subsReq = ClientEventsSubscribeRequest.newBuilder()
                    .addLastKnownBlockIds("0000000000000000").addSubscriptions(deltaSubscription).addSubscriptions(mySubscription)
                    .build();

            Future sawtoothSubsFuture = eventStream.send(MessageType.CLIENT_EVENTS_SUBSCRIBE_REQUEST,
                    subsReq.toByteString());

            ClientEventsSubscribeResponse eventSubsResp = ClientEventsSubscribeResponse
                    .parseFrom(sawtoothSubsFuture.getResult());

            System.out.println("eventSubsResp.getStatus() :: " + eventSubsResp.getStatus());
            
             if (eventSubsResp.getStatus().equals(ClientEventsSubscribeResponse.Status.UNKNOWN_BLOCK)) {
                 System.out.println("Unknown block ");
                 // retry connection if this happens by calling this same method
                 
             }
            if(!eventSubsResp.getStatus().equals(ClientEventsSubscribeResponse.Status.OK)) {
                System.out.println("Subscription failed with status " + eventSubsResp.getStatus());
                throw new RuntimeException("cannot connect ");
            } else {
                isActive = true;
                System.out.println("Making active ");
            }
            
            
            while(isActive) {
                Message eventMsg =  eventStream.receive();
                EventList eventList = EventList.parseFrom(eventMsg.getContent());
                for (Event event : eventList.getEventsList()) {                 
                    System.out.println("An event ::::");
                    System.out.println(event);
                }
                
            }
        } catch (Exception e) {
            e.printStackTrace();
        }