为什么我的 streamparse 拓扑定义抱怨 thrift$mk-topology 的参数数量错误?
Why is my streamparse topology definition complaining about a wrong number of arguments to thrift$mk-topology?
我试图让一个非常简单的 streamparse(即 Apache Storm)spout 工作,但是当 运行 sparse run -t 120
:
时我收到以下错误
Caught exception: Wrong number of args (1) passed to: thrift$mk-topology
clojure.lang.ArityException: Wrong number of args (1) passed to: thrift$mk-topology
at clojure.lang.AFn.throwArity (AFn.java:437)
clojure.lang.AFn.invoke (AFn.java:39)
clojure.lang.AFn.applyToHelper (AFn.java:161)
clojure.lang.AFn.applyTo (AFn.java:151)
clojure.core$apply.invoke (core.clj:617)
streamparse.commands.run$run_local_BANG_.invoke (run.clj:20)
streamparse.commands.run$_main.doInvoke (run.clj:79)
clojure.lang.RestFn.invoke (RestFn.java:930)
clojure.lang.Var.invoke (Var.java:460)
user$eval5.invoke (form-init1145748518959444179.clj:1)
clojure.lang.Compiler.eval (Compiler.java:6619)
clojure.lang.Compiler.eval (Compiler.java:6609)
clojure.lang.Compiler.load (Compiler.java:7064)
clojure.lang.Compiler.loadFile (Compiler.java:7020)
clojure.main$load_script.invoke (main.clj:294)
clojure.main$init_opt.invoke (main.clj:299)
clojure.main$initialize.invoke (main.clj:327)
clojure.main$null_opt.invoke (main.clj:362)
clojure.main$main.doInvoke (main.clj:440)
clojure.lang.RestFn.invoke (RestFn.java:421)
clojure.lang.Var.invoke (Var.java:419)
clojure.lang.AFn.applyToHelper (AFn.java:163)
clojure.lang.Var.applyTo (Var.java:532)
clojure.main.main (main.java:37)
993 [main] ERROR org.apache.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
java.lang.NullPointerException: null
at streamparse.commands.run$run_local_BANG_.invoke(run.clj:33) ~[streamparse-0.0.4-SNAPSHOT.jar:na]
at streamparse.commands.run$_main.doInvoke(run.clj:79) ~[streamparse-0.0.4-SNAPSHOT.jar:na]
at clojure.lang.RestFn.invoke(RestFn.java:930) ~[clojure-1.5.1.jar:na]
at clojure.lang.Var.invoke(Var.java:460) ~[clojure-1.5.1.jar:na]
at user$eval5.invoke(form-init1145748518959444179.clj:1) ~[na:na]
at clojure.lang.Compiler.eval(Compiler.java:6619) ~[clojure-1.5.1.jar:na]
at clojure.lang.Compiler.eval(Compiler.java:6609) ~[clojure-1.5.1.jar:na]
at clojure.lang.Compiler.load(Compiler.java:7064) ~[clojure-1.5.1.jar:na]
at clojure.lang.Compiler.loadFile(Compiler.java:7020) ~[clojure-1.5.1.jar:na]
at clojure.main$load_script.invoke(main.clj:294) ~[clojure-1.5.1.jar:na]
at clojure.main$init_opt.invoke(main.clj:299) ~[clojure-1.5.1.jar:na]
at clojure.main$initialize.invoke(main.clj:327) ~[clojure-1.5.1.jar:na]
at clojure.main$null_opt.invoke(main.clj:362) ~[clojure-1.5.1.jar:na]
at clojure.main$main.doInvoke(main.clj:440) ~[clojure-1.5.1.jar:na]
at clojure.lang.RestFn.invoke(RestFn.java:421) ~[clojure-1.5.1.jar:na]
at clojure.lang.Var.invoke(Var.java:419) ~[clojure-1.5.1.jar:na]
at clojure.lang.AFn.applyToHelper(AFn.java:163) ~[clojure-1.5.1.jar:na]
at clojure.lang.Var.applyTo(Var.java:532) ~[clojure-1.5.1.jar:na]
at clojure.main.main(main.java:37) ~[clojure-1.5.1.jar:na]
Traceback (most recent call last):
File "/usr/local/bin/sparse", line 9, in <module>
load_entry_point('streamparse==1.1.0', 'console_scripts', 'sparse')()
File "/usr/local/lib/python2.7/dist-packages/streamparse/cmdln.py", line 83, in main
run_local_topology(args["--name"], time, par, options, args["--debug"])
File "/home/louis/.local/lib/python2.7/site-packages/invoke/tasks.py", line 111, in __call__
result = self.body(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/streamparse/ext/invoke.py", line 182, in run_local_topology
run(full_cmd)
File "/home/louis/.local/lib/python2.7/site-packages/invoke/runner.py", line 349, in run
return runner.run(command, **kwargs)
File "/home/louis/.local/lib/python2.7/site-packages/invoke/runner.py", line 153, in run
raise Failure(result)
invoke.exceptions.Failure: Command execution failure!
Exit code: 1
Stderr:
我的拓扑.clj
文件如下:
(ns facetrack
(:use [streamparse.specs])
(:gen-class))
(defn facetrack [options]
[
;; spout configuration
{"frame-spout" (python-spout-spec
options
"spouts.frames.FrameSpout"
["frame"]
)
}
]
)
最后,这是 spout 的 .py
文件。它初始化网络摄像头并开始以 numpy 数组的形式生成帧:
from __future__ import absolute_import, unicode_literals
import logging
import cv2
from streamparse.spout import Spout
class FrameSpout(Spout):
def initialize(self, stormconf, context):
self.cam = cv2.VideoCapture(0)
def next_tuple(self):
got_image, img = self.cam.read()
if not got_image:
logging.debug("Failed to grab frame")
else:
self.emit([img])
我确信我忽略了一些简单的东西,但由于我对 streamparse 和 Storm 很陌生,所以我不确定去哪里找。任何建议将不胜感激!
问题是拓扑定义需要两个地图的列表。一张地图用于 spout 定义,第二张地图用于螺栓定义。创建仅喷口地形的解决方案是向 .clj
文件添加第二个 empty 地图:
(ns facetrack
(:use [streamparse.specs])
(:gen-class))
(defn facetrack [options]
[
;; spout configuration
{"frame-spout" (python-spout-spec
options
"spouts.frames.FrameSpout"
["frame"]
)
}
{} ;; <---- THIS
]
)
我试图让一个非常简单的 streamparse(即 Apache Storm)spout 工作,但是当 运行 sparse run -t 120
:
Caught exception: Wrong number of args (1) passed to: thrift$mk-topology
clojure.lang.ArityException: Wrong number of args (1) passed to: thrift$mk-topology
at clojure.lang.AFn.throwArity (AFn.java:437)
clojure.lang.AFn.invoke (AFn.java:39)
clojure.lang.AFn.applyToHelper (AFn.java:161)
clojure.lang.AFn.applyTo (AFn.java:151)
clojure.core$apply.invoke (core.clj:617)
streamparse.commands.run$run_local_BANG_.invoke (run.clj:20)
streamparse.commands.run$_main.doInvoke (run.clj:79)
clojure.lang.RestFn.invoke (RestFn.java:930)
clojure.lang.Var.invoke (Var.java:460)
user$eval5.invoke (form-init1145748518959444179.clj:1)
clojure.lang.Compiler.eval (Compiler.java:6619)
clojure.lang.Compiler.eval (Compiler.java:6609)
clojure.lang.Compiler.load (Compiler.java:7064)
clojure.lang.Compiler.loadFile (Compiler.java:7020)
clojure.main$load_script.invoke (main.clj:294)
clojure.main$init_opt.invoke (main.clj:299)
clojure.main$initialize.invoke (main.clj:327)
clojure.main$null_opt.invoke (main.clj:362)
clojure.main$main.doInvoke (main.clj:440)
clojure.lang.RestFn.invoke (RestFn.java:421)
clojure.lang.Var.invoke (Var.java:419)
clojure.lang.AFn.applyToHelper (AFn.java:163)
clojure.lang.Var.applyTo (Var.java:532)
clojure.main.main (main.java:37)
993 [main] ERROR org.apache.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
java.lang.NullPointerException: null
at streamparse.commands.run$run_local_BANG_.invoke(run.clj:33) ~[streamparse-0.0.4-SNAPSHOT.jar:na]
at streamparse.commands.run$_main.doInvoke(run.clj:79) ~[streamparse-0.0.4-SNAPSHOT.jar:na]
at clojure.lang.RestFn.invoke(RestFn.java:930) ~[clojure-1.5.1.jar:na]
at clojure.lang.Var.invoke(Var.java:460) ~[clojure-1.5.1.jar:na]
at user$eval5.invoke(form-init1145748518959444179.clj:1) ~[na:na]
at clojure.lang.Compiler.eval(Compiler.java:6619) ~[clojure-1.5.1.jar:na]
at clojure.lang.Compiler.eval(Compiler.java:6609) ~[clojure-1.5.1.jar:na]
at clojure.lang.Compiler.load(Compiler.java:7064) ~[clojure-1.5.1.jar:na]
at clojure.lang.Compiler.loadFile(Compiler.java:7020) ~[clojure-1.5.1.jar:na]
at clojure.main$load_script.invoke(main.clj:294) ~[clojure-1.5.1.jar:na]
at clojure.main$init_opt.invoke(main.clj:299) ~[clojure-1.5.1.jar:na]
at clojure.main$initialize.invoke(main.clj:327) ~[clojure-1.5.1.jar:na]
at clojure.main$null_opt.invoke(main.clj:362) ~[clojure-1.5.1.jar:na]
at clojure.main$main.doInvoke(main.clj:440) ~[clojure-1.5.1.jar:na]
at clojure.lang.RestFn.invoke(RestFn.java:421) ~[clojure-1.5.1.jar:na]
at clojure.lang.Var.invoke(Var.java:419) ~[clojure-1.5.1.jar:na]
at clojure.lang.AFn.applyToHelper(AFn.java:163) ~[clojure-1.5.1.jar:na]
at clojure.lang.Var.applyTo(Var.java:532) ~[clojure-1.5.1.jar:na]
at clojure.main.main(main.java:37) ~[clojure-1.5.1.jar:na]
Traceback (most recent call last):
File "/usr/local/bin/sparse", line 9, in <module>
load_entry_point('streamparse==1.1.0', 'console_scripts', 'sparse')()
File "/usr/local/lib/python2.7/dist-packages/streamparse/cmdln.py", line 83, in main
run_local_topology(args["--name"], time, par, options, args["--debug"])
File "/home/louis/.local/lib/python2.7/site-packages/invoke/tasks.py", line 111, in __call__
result = self.body(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/streamparse/ext/invoke.py", line 182, in run_local_topology
run(full_cmd)
File "/home/louis/.local/lib/python2.7/site-packages/invoke/runner.py", line 349, in run
return runner.run(command, **kwargs)
File "/home/louis/.local/lib/python2.7/site-packages/invoke/runner.py", line 153, in run
raise Failure(result)
invoke.exceptions.Failure: Command execution failure!
Exit code: 1
Stderr:
我的拓扑.clj
文件如下:
(ns facetrack
(:use [streamparse.specs])
(:gen-class))
(defn facetrack [options]
[
;; spout configuration
{"frame-spout" (python-spout-spec
options
"spouts.frames.FrameSpout"
["frame"]
)
}
]
)
最后,这是 spout 的 .py
文件。它初始化网络摄像头并开始以 numpy 数组的形式生成帧:
from __future__ import absolute_import, unicode_literals
import logging
import cv2
from streamparse.spout import Spout
class FrameSpout(Spout):
def initialize(self, stormconf, context):
self.cam = cv2.VideoCapture(0)
def next_tuple(self):
got_image, img = self.cam.read()
if not got_image:
logging.debug("Failed to grab frame")
else:
self.emit([img])
我确信我忽略了一些简单的东西,但由于我对 streamparse 和 Storm 很陌生,所以我不确定去哪里找。任何建议将不胜感激!
问题是拓扑定义需要两个地图的列表。一张地图用于 spout 定义,第二张地图用于螺栓定义。创建仅喷口地形的解决方案是向 .clj
文件添加第二个 empty 地图:
(ns facetrack
(:use [streamparse.specs])
(:gen-class))
(defn facetrack [options]
[
;; spout configuration
{"frame-spout" (python-spout-spec
options
"spouts.frames.FrameSpout"
["frame"]
)
}
{} ;; <---- THIS
]
)