如何使用 Sawtooth Java SDK 创建通道事件?
How to create channel events using Sawtooth Java SDK?
HyperLeger Sawtooth 支持订阅交易处理器中的事件。但是,有没有一种方法可以在事务处理器中创建特定于应用程序的事件,例如此处的 Python 示例:https://www.jacklllll.xyz/blog/2019/04/08/sawtooth/
ctx.addEvent(
'agreement/create',
[['name', 'agreement'],
['address', address],
['buyer name', agreement.BuyerName],
['seller name', agreement.SellerName],
['house id', agreement.HouseID],
['creator', signer]],
null)
在当前的 Sawtooth-Java SDK v0.1.2 中,唯一的覆盖是
apply(TpProcessRequest, State)
addEvent(TpProcessRequest, Context)
到目前为止,我已经设法监听了事件 sawtooth/state-delta
但是这给了我那个 tx-family
的所有状态变化
import sawtooth.sdk.protobuf.EventSubscription;
import sawtooth.sdk.protobuf.EventFilter;
import sawtooth.sdk.protobuf.ClientEventsSubscribeRequest;
import sawtooth.sdk.protobuf.ClientEventsSubscribeResponse;
import sawtooth.sdk.protobuf.ClientEventsUnsubscribeRequest;
import sawtooth.sdk.protobuf.Message;
EventFilter filter = EventFilter.newBuilder()
.setKey("address")
.setMatchString(nameSpace.concat(".*"))
.setFilterType(EventFilter.FilterType.REGEX_ANY)
.build();
EventSubscription subscription = EventSubscription.newBuilder()
.setEventType("sawtooth/state-delta")
.addFilters(filter)
.build();
context = new ZContext();
socket = context.createSocket(ZMQ.DEALER);
socket.connect("tcp://sawtooth-rest:4004");
ClientEventsSubscribeRequest request = ClientEventsSubscribeRequest.newBuilder()
.addSubscriptions(subscription)
.build();
message = Message.newBuilder()
.setCorrelationId("123")
.setMessageType(Message.MessageType.CLIENT_EVENTS_SUBSCRIBE_REQUEST)
.setContent(request.toByteString())
.build();
socket.send(message.toByteArray());
注册 Message.MessageType.CLIENT_EVENTS_SUBSCRIBE_REQUEST
后,我会在线程循环中收到消息。
我希望在 TransactionHandler 中我应该能够 addEvent()
或创建某种类型的事件,然后可以使用 Java SDK 进行订阅。
有没有其他人尝试在 Sawtooth 上的 JAVA 中创建自定义事件?
这是在 Python 中添加的事件示例。 Java 类似。
您在交易处理器中添加 custom-named 事件:
context.add_event(event_type="cookiejar/bake", attributes=[("cookies-baked", amount)])
见https://github.com/danintel/sawtooth-cookiejar/blob/master/pyprocessor/cookiejar_tp.py#L138
以下是用 Python 和 Go 编写的事件处理程序示例:
https://github.com/danintel/sawtooth-cookiejar/tree/master/events
Java 也类似。基本上事件处理程序中的逻辑是:
- 订阅您想收听的活动
- 向验证器发送请求
- 读取并解析订阅响应
- 在循环中,循环监听订阅的事件
- 退出循环后(如果有的话),取消订阅 rom 事件
对于那些试图将 java SDK 用于事件 publishing/subscribing 的用户 - 没有直接可用的 API。至少我找不到它,我正在使用 1.0 docker 图像。
因此,要发布您的事件,您需要直接发布到锯齿波 rest-api 服务器。需要注意以下事项:
- 您需要一个仅对每个请求有效的上下文 ID。您可以从 apply() 方法中的请求中获取此信息。 (下面的代码)。因此,请确保在事务发布期间发布事件,即在 apply() 方法
的实施期间
- 事件结构将如文档中所述here
- 如果交易成功并且块已提交,您将在事件订阅者中获得事件,否则它不会显示。
- 创建订阅者时您需要订阅
sawtooth/block-commit
活动并为您的活动类型添加额外的订阅,例如“myNS/my-event”
示例事件发布代码:
public void apply(TpProcessRequest request, State state) throws InvalidTransactionException, InternalError {
///process your trasaction first
sawtooth.sdk.messaging.Stream eventStream = new Stream("tcp://localhost:4004"); // make this in the constructor of class NOT here
List<Attribute> attrList = new ArrayList<>();
Attribute attrs = Attribute.newBuilder().setKey("someKey").setValue("someValue").build();
attrList.add(attrs);
Event appEvent = Event.newBuilder().setEventType("myNS/my-event-type")
.setData( <some ByteString here>).addAllAttributes(attrList).build();
TpEventAddRequest addEventRequest = TpEventAddRequest.newBuilder()
.setContextId(request.getContextId()).setEvent(appEvent).build();
Future sawtoothSubsFuture = eventStream.send(MessageType.TP_EVENT_ADD_REQUEST, addEventRequest.toByteString());
try {
System.out.println(sawtoothSubsFuture.getResult());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
然后您订阅这样的事件(灵感来自市场样本):
try {
EventFilter eventFilter = EventFilter.newBuilder().setKey("address")
.setMatchString(String.format("^%s.*", "myNamespace"))
.setFilterType(FilterType.REGEX_ANY).build();
//subscribe to sawtooth/block-commit
EventSubscription deltaSubscription = EventSubscription.newBuilder().setEventType("sawtooth/block-commit")
.addFilters(eventFilter)
.build();
EventSubscription mySubscription = EventSubscription.newBuilder().setEventType("myNS/my-event-type")
.build(); //no filters added for my events.
ClientEventsSubscribeRequest subsReq = ClientEventsSubscribeRequest.newBuilder()
.addLastKnownBlockIds("0000000000000000").addSubscriptions(deltaSubscription).addSubscriptions(mySubscription)
.build();
Future sawtoothSubsFuture = eventStream.send(MessageType.CLIENT_EVENTS_SUBSCRIBE_REQUEST,
subsReq.toByteString());
ClientEventsSubscribeResponse eventSubsResp = ClientEventsSubscribeResponse
.parseFrom(sawtoothSubsFuture.getResult());
System.out.println("eventSubsResp.getStatus() :: " + eventSubsResp.getStatus());
if (eventSubsResp.getStatus().equals(ClientEventsSubscribeResponse.Status.UNKNOWN_BLOCK)) {
System.out.println("Unknown block ");
// retry connection if this happens by calling this same method
}
if(!eventSubsResp.getStatus().equals(ClientEventsSubscribeResponse.Status.OK)) {
System.out.println("Subscription failed with status " + eventSubsResp.getStatus());
throw new RuntimeException("cannot connect ");
} else {
isActive = true;
System.out.println("Making active ");
}
while(isActive) {
Message eventMsg = eventStream.receive();
EventList eventList = EventList.parseFrom(eventMsg.getContent());
for (Event event : eventList.getEventsList()) {
System.out.println("An event ::::");
System.out.println(event);
}
}
} catch (Exception e) {
e.printStackTrace();
}
HyperLeger Sawtooth 支持订阅交易处理器中的事件。但是,有没有一种方法可以在事务处理器中创建特定于应用程序的事件,例如此处的 Python 示例:https://www.jacklllll.xyz/blog/2019/04/08/sawtooth/
ctx.addEvent(
'agreement/create',
[['name', 'agreement'],
['address', address],
['buyer name', agreement.BuyerName],
['seller name', agreement.SellerName],
['house id', agreement.HouseID],
['creator', signer]],
null)
在当前的 Sawtooth-Java SDK v0.1.2 中,唯一的覆盖是
apply(TpProcessRequest, State)
addEvent(TpProcessRequest, Context)
到目前为止,我已经设法监听了事件 sawtooth/state-delta
但是这给了我那个 tx-family
import sawtooth.sdk.protobuf.EventSubscription;
import sawtooth.sdk.protobuf.EventFilter;
import sawtooth.sdk.protobuf.ClientEventsSubscribeRequest;
import sawtooth.sdk.protobuf.ClientEventsSubscribeResponse;
import sawtooth.sdk.protobuf.ClientEventsUnsubscribeRequest;
import sawtooth.sdk.protobuf.Message;
EventFilter filter = EventFilter.newBuilder()
.setKey("address")
.setMatchString(nameSpace.concat(".*"))
.setFilterType(EventFilter.FilterType.REGEX_ANY)
.build();
EventSubscription subscription = EventSubscription.newBuilder()
.setEventType("sawtooth/state-delta")
.addFilters(filter)
.build();
context = new ZContext();
socket = context.createSocket(ZMQ.DEALER);
socket.connect("tcp://sawtooth-rest:4004");
ClientEventsSubscribeRequest request = ClientEventsSubscribeRequest.newBuilder()
.addSubscriptions(subscription)
.build();
message = Message.newBuilder()
.setCorrelationId("123")
.setMessageType(Message.MessageType.CLIENT_EVENTS_SUBSCRIBE_REQUEST)
.setContent(request.toByteString())
.build();
socket.send(message.toByteArray());
注册 Message.MessageType.CLIENT_EVENTS_SUBSCRIBE_REQUEST
后,我会在线程循环中收到消息。
我希望在 TransactionHandler 中我应该能够 addEvent()
或创建某种类型的事件,然后可以使用 Java SDK 进行订阅。
有没有其他人尝试在 Sawtooth 上的 JAVA 中创建自定义事件?
这是在 Python 中添加的事件示例。 Java 类似。 您在交易处理器中添加 custom-named 事件:
context.add_event(event_type="cookiejar/bake", attributes=[("cookies-baked", amount)])
见https://github.com/danintel/sawtooth-cookiejar/blob/master/pyprocessor/cookiejar_tp.py#L138
以下是用 Python 和 Go 编写的事件处理程序示例: https://github.com/danintel/sawtooth-cookiejar/tree/master/events Java 也类似。基本上事件处理程序中的逻辑是:
- 订阅您想收听的活动
- 向验证器发送请求
- 读取并解析订阅响应
- 在循环中,循环监听订阅的事件
- 退出循环后(如果有的话),取消订阅 rom 事件
对于那些试图将 java SDK 用于事件 publishing/subscribing 的用户 - 没有直接可用的 API。至少我找不到它,我正在使用 1.0 docker 图像。 因此,要发布您的事件,您需要直接发布到锯齿波 rest-api 服务器。需要注意以下事项:
- 您需要一个仅对每个请求有效的上下文 ID。您可以从 apply() 方法中的请求中获取此信息。 (下面的代码)。因此,请确保在事务发布期间发布事件,即在 apply() 方法 的实施期间
- 事件结构将如文档中所述here
- 如果交易成功并且块已提交,您将在事件订阅者中获得事件,否则它不会显示。
- 创建订阅者时您需要订阅
sawtooth/block-commit
活动并为您的活动类型添加额外的订阅,例如“myNS/my-event”
示例事件发布代码:
public void apply(TpProcessRequest request, State state) throws InvalidTransactionException, InternalError {
///process your trasaction first
sawtooth.sdk.messaging.Stream eventStream = new Stream("tcp://localhost:4004"); // make this in the constructor of class NOT here
List<Attribute> attrList = new ArrayList<>();
Attribute attrs = Attribute.newBuilder().setKey("someKey").setValue("someValue").build();
attrList.add(attrs);
Event appEvent = Event.newBuilder().setEventType("myNS/my-event-type")
.setData( <some ByteString here>).addAllAttributes(attrList).build();
TpEventAddRequest addEventRequest = TpEventAddRequest.newBuilder()
.setContextId(request.getContextId()).setEvent(appEvent).build();
Future sawtoothSubsFuture = eventStream.send(MessageType.TP_EVENT_ADD_REQUEST, addEventRequest.toByteString());
try {
System.out.println(sawtoothSubsFuture.getResult());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
然后您订阅这样的事件(灵感来自市场样本):
try {
EventFilter eventFilter = EventFilter.newBuilder().setKey("address")
.setMatchString(String.format("^%s.*", "myNamespace"))
.setFilterType(FilterType.REGEX_ANY).build();
//subscribe to sawtooth/block-commit
EventSubscription deltaSubscription = EventSubscription.newBuilder().setEventType("sawtooth/block-commit")
.addFilters(eventFilter)
.build();
EventSubscription mySubscription = EventSubscription.newBuilder().setEventType("myNS/my-event-type")
.build(); //no filters added for my events.
ClientEventsSubscribeRequest subsReq = ClientEventsSubscribeRequest.newBuilder()
.addLastKnownBlockIds("0000000000000000").addSubscriptions(deltaSubscription).addSubscriptions(mySubscription)
.build();
Future sawtoothSubsFuture = eventStream.send(MessageType.CLIENT_EVENTS_SUBSCRIBE_REQUEST,
subsReq.toByteString());
ClientEventsSubscribeResponse eventSubsResp = ClientEventsSubscribeResponse
.parseFrom(sawtoothSubsFuture.getResult());
System.out.println("eventSubsResp.getStatus() :: " + eventSubsResp.getStatus());
if (eventSubsResp.getStatus().equals(ClientEventsSubscribeResponse.Status.UNKNOWN_BLOCK)) {
System.out.println("Unknown block ");
// retry connection if this happens by calling this same method
}
if(!eventSubsResp.getStatus().equals(ClientEventsSubscribeResponse.Status.OK)) {
System.out.println("Subscription failed with status " + eventSubsResp.getStatus());
throw new RuntimeException("cannot connect ");
} else {
isActive = true;
System.out.println("Making active ");
}
while(isActive) {
Message eventMsg = eventStream.receive();
EventList eventList = EventList.parseFrom(eventMsg.getContent());
for (Event event : eventList.getEventsList()) {
System.out.println("An event ::::");
System.out.println(event);
}
}
} catch (Exception e) {
e.printStackTrace();
}