apache风暴流解析

lnvxswe2  于 2021-06-24  发布在  Storm
关注(0)|答案(0)|浏览(302)

我是apache storm的新手。我正在windows 10中运行apache storm+流解析。
所以我就试着跟着做(http://streamparse.readthedocs.io/en/master/quickstart.html)

First, Install Python 3.5 and JDK 1.8.0_131.
Secod, Download Apache Storm 1.1.0 and extract it.
Third, Zookeeper-3.3.6
And set Windows Environment Variable to like this.

JAVA_HOME=D:\dev\jdk1.8.0_131
STORM_HOME=D:\dev\apache-storm-1.1.0
LEIN_ROOT=D:\dev\leiningen-2.7.1-standalone
Path = %STORM_HOME%\bin;%JAVA_HOME%\bin;D:\Program Files\Python35;D:\Program Files\Python35\Lib\site-packages\;D:\Program Files\Python35\Scripts\;
PATHEXT=.PY;

然后,在cmd中,

"zkServer.cmd"
"storm nimbus"
"storm supervisor"
"storm ui"

没关系。但是

"sparse quickstart wordcount"
"cd wordcount"
"sparse run"

然后有一个登录命令。

Traceback (most recent call last):
  File "d:\program files\python35\lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "d:\program files\python35\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "D:\Program Files\Python35\Scripts\sparse.exe\__main__.py", line 9, in <module>
  File "d:\program files\python35\lib\site-packages\streamparse\cli\sparse.py", line 71, in main
    if os.getuid() == 0 and not os.getenv('LEIN_ROOT'):
AttributeError: module 'os' has no attribute 'getuid'

所以我将sparse.py第71行修改为“if not os.getenv('lein\u root'):”

d:\dev\wordcount\jps
2768 Supervisor
13396 QuorumPeerMain
1492 nimbus
8388 Flux
1016 core
12220 Jps

这是一个日志。

2017-07-18 15:07:02.731 o.a.s.z.Zookeeper main [INFO] Staring ZK Curator
2017-07-18 15:07:02.732 o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl main [INFO] Starting
2017-07-18 15:07:02.733 o.a.s.s.o.a.z.ZooKeeper main [INFO] Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@491f8831
2017-07-18 15:07:02.738 o.a.s.s.o.a.z.ClientCnxn main-SendThread(0:0:0:0:0:0:0:1:2000) [INFO] Opening socket connection to server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
2017-07-18 15:07:02.740 o.a.s.s.o.a.z.ClientCnxn main-SendThread(0:0:0:0:0:0:0:1:2000) [INFO] Socket connection established to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2000, initiating session
2017-07-18 15:07:02.730 o.a.s.s.o.a.z.s.NIOServerCnxn NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000 [WARN] caught end of stream exception
org.apache.storm.shade.org.apache.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x15d5485539a000b, likely client has closed socket
    at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) [storm-core-1.1.0.jar:1.1.0]
    at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-1.1.0.jar:1.1.0]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
2017-07-18 15:07:02.745 o.a.s.s.o.a.z.s.NIOServerCnxn NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000 [INFO] Closed socket connection for client /127.0.0.1:3925 which had sessionid 0x15d5485539a000b
2017-07-18 15:07:02.747 o.a.s.s.o.a.z.s.NIOServerCnxnFactory NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000 [INFO] Accepted socket connection from /0:0:0:0:0:0:0:1:3928
2017-07-18 15:07:02.748 o.a.s.s.o.a.z.s.ZooKeeperServer NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000 [INFO] Client attempting to establish new session at /0:0:0:0:0:0:0:1:3928
2017-07-18 15:07:02.925 o.a.s.s.o.a.z.s.ZooKeeperServer SyncThread:0 [INFO] Established session 0x15d5485539a000c with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:3928
2017-07-18 15:07:02.925 o.a.s.s.o.a.z.ClientCnxn main-SendThread(0:0:0:0:0:0:0:1:2000) [INFO] Session establishment complete on server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2000, sessionid = 0x15d5485539a000c, negotiated timeout = 20000
2017-07-18 15:07:02.925 o.a.s.s.o.a.c.f.s.ConnectionStateManager main-EventThread [INFO] State change: CONNECTED
2017-07-18 15:07:02.948 o.a.s.l.Localizer main [INFO] Reconstruct localized resource: C:\Users\BigTone\AppData\Local\Temp\50d72755-68f7-4d97-86e8-a1e4c1035242\supervisor\usercache
2017-07-18 15:07:02.949 o.a.s.l.Localizer main [WARN] No left over resources found for any user during reconstructing of local resources at: C:\Users\BigTone\AppData\Local\Temp\50d72755-68f7-4d97-86e8-a1e4c1035242\supervisor\usercache
2017-07-18 15:07:02.950 o.a.s.d.s.Supervisor main [INFO] Starting Supervisor with conf {topology.builtin.metrics.bucket.size.secs=60, nimbus.childopts=-Xmx1024m, ui.filter.params=null, storm.cluster.mode=local, storm.messaging.netty.client_worker_threads=1, logviewer.max.per.worker.logs.size.mb=2048, supervisor.run.worker.as.user=false, topology.max.task.parallelism=null, topology.priority=29, zmq.threads=1, storm.group.mapping.service=org.apache.storm.security.auth.ShellBasedGroupsMapping, transactional.zookeeper.root=/transactional, topology.sleep.spout.wait.strategy.time.ms=1, scheduler.display.resource=false, topology.max.replication.wait.time.sec=60, drpc.invocations.port=3773, supervisor.localizer.cache.target.size.mb=10240, topology.multilang.serializer=org.apache.storm.multilang.JsonSerializer, storm.messaging.netty.server_worker_threads=1, nimbus.blobstore.class=org.apache.storm.blobstore.LocalFsBlobStore, resource.aware.scheduler.eviction.strategy=org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy, topology.max.error.report.per.interval=5, storm.thrift.transport=org.apache.storm.security.auth.SimpleTransportPlugin, zmq.hwm=0, storm.group.mapping.service.params=null, worker.profiler.enabled=false, storm.principal.tolocal=org.apache.storm.security.auth.DefaultPrincipalToLocal, supervisor.worker.shutdown.sleep.secs=3, pacemaker.host=localhost, storm.zookeeper.retry.times=5, ui.actions.enabled=true, zmq.linger.millis=0, supervisor.enable=true, topology.stats.sample.rate=0.05, storm.messaging.netty.min_wait_ms=100, worker.log.level.reset.poll.secs=30, storm.zookeeper.port=2000, supervisor.heartbeat.frequency.secs=5, topology.enable.message.timeouts=true, supervisor.cpu.capacity=400.0, drpc.worker.threads=64, supervisor.blobstore.download.thread.count=5, task.backpressure.poll.secs=30, drpc.queue.size=128, topology.backpressure.enable=false, supervisor.blobstore.class=org.apache.storm.blobstore.NimbusBlobStore, storm.blobstore.inputstream.buffer.size.bytes=65536, topology.shellbolt.max.pending=100, drpc.https.keystore.password=, nimbus.code.sync.freq.secs=120, logviewer.port=8000, topology.scheduler.strategy=org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy, topology.executor.send.buffer.size=1024, resource.aware.scheduler.priority.strategy=org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy, pacemaker.auth.method=NONE, storm.daemon.metrics.reporter.plugins=[org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter], topology.worker.logwriter.childopts=-Xmx64m, topology.spout.wait.strategy=org.apache.storm.spout.SleepSpoutWaitStrategy, ui.host=0.0.0.0, storm.nimbus.retry.interval.millis=2000, nimbus.inbox.jar.expiration.secs=3600, dev.zookeeper.path=/tmp/dev-storm-zookeeper, topology.acker.executors=null, topology.fall.back.on.java.serialization=true, topology.eventlogger.executors=0, supervisor.localizer.cleanup.interval.ms=600000, storm.zookeeper.servers=[localhost], nimbus.thrift.threads=64, logviewer.cleanup.age.mins=10080, topology.worker.childopts=null, topology.classpath=null, supervisor.monitor.frequency.secs=3, nimbus.credential.renewers.freq.secs=600, topology.skip.missing.kryo.registrations=true, drpc.authorizer.acl.filename=drpc-auth-acl.yaml, pacemaker.kerberos.users=[], storm.group.mapping.service.cache.duration.secs=120, blobstore.dir=C:\Users\BigTone\AppData\Local\Temp\d43926be-beb0-44af-9620-1d547b57a96d, topology.testing.always.try.serialize=false, nimbus.monitor.freq.secs=10, storm.health.check.timeout.ms=5000, supervisor.supervisors=[], topology.tasks=null, topology.bolts.outgoing.overflow.buffer.enable=false, storm.messaging.netty.socket.backlog=500, topology.workers=1, pacemaker.base.threads=10, storm.local.dir=C:\Users\BigTone\AppData\Local\Temp\50d72755-68f7-4d97-86e8-a1e4c1035242, worker.childopts=-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump, storm.auth.simple-white-list.users=[], topology.disruptor.batch.timeout.millis=1, topology.message.timeout.secs=30, topology.state.synchronization.timeout.secs=60, topology.tuple.serializer=org.apache.storm.serialization.types.ListDelegateSerializer, supervisor.supervisors.commands=[], nimbus.blobstore.expiration.secs=600, logviewer.childopts=-Xmx128m, topology.environment=null, topology.debug=false, topology.disruptor.batch.size=100, storm.disable.symlinks=false, storm.messaging.netty.max_retries=300, ui.childopts=-Xmx768m, storm.network.topography.plugin=org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping, storm.zookeeper.session.timeout=20000, drpc.childopts=-Xmx768m, drpc.http.creds.plugin=org.apache.storm.security.auth.DefaultHttpCredentialsPlugin, storm.zookeeper.connection.timeout=15000, storm.zookeeper.auth.user=null, storm.meta.serialization.delegate=org.apache.storm.serialization.GzipThriftSerializationDelegate, topology.max.spout.pending=null, storm.codedistributor.class=org.apache.storm.codedistributor.LocalFileSystemCodeDistributor, nimbus.supervisor.timeout.secs=60, nimbus.task.timeout.secs=30, drpc.port=3772, pacemaker.max.threads=50, storm.zookeeper.retry.intervalceiling.millis=30000, nimbus.thrift.port=6627, storm.auth.simple-acl.admins=[], topology.component.cpu.pcore.percent=10.0, supervisor.memory.capacity.mb=3072.0, storm.nimbus.retry.times=5, supervisor.worker.start.timeout.secs=120, storm.zookeeper.retry.interval=1000, logs.users=null, storm.cluster.metrics.consumer.publish.interval.secs=60, worker.profiler.command=flight.bash, transactional.zookeeper.port=null, drpc.max_buffer_size=1048576, pacemaker.thread.timeout=10, task.credentials.poll.secs=30, blobstore.superuser=BigTone, drpc.https.keystore.type=JKS, topology.worker.receiver.thread.count=1, topology.state.checkpoint.interval.ms=1000, supervisor.slots.ports=[1027, 1028, 1029], topology.transfer.buffer.size=1024, storm.health.check.dir=healthchecks, topology.worker.shared.thread.pool.size=4, drpc.authorizer.acl.strict=false, nimbus.file.copy.expiration.secs=600, worker.profiler.childopts=-XX:+UnlockCommercialFeatures -XX:+FlightRecorder, topology.executor.receive.buffer.size=1024, backpressure.disruptor.low.watermark=0.4, nimbus.task.launch.secs=120, storm.local.mode.zmq=false, storm.messaging.netty.buffer_size=5242880, storm.cluster.state.store=org.apache.storm.cluster_state.zookeeper_state_factory, worker.heartbeat.frequency.secs=1, storm.log4j2.conf.dir=log4j2, ui.http.creds.plugin=org.apache.storm.security.auth.DefaultHttpCredentialsPlugin, storm.zookeeper.root=/storm, topology.tick.tuple.freq.secs=null, drpc.https.port=-1, storm.workers.artifacts.dir=workers-artifacts, supervisor.blobstore.download.max_retries=3, task.refresh.poll.secs=10, storm.exhibitor.port=8080, task.heartbeat.frequency.secs=3, pacemaker.port=6699, storm.messaging.netty.max_wait_ms=1000, topology.component.resources.offheap.memory.mb=0.0, drpc.http.port=3774, topology.error.throttle.interval.secs=10, storm.messaging.transport=org.apache.storm.messaging.netty.Context, topology.disable.loadaware.messaging=false, storm.messaging.netty.authentication=false, topology.component.resources.onheap.memory.mb=128.0, topology.kryo.factory=org.apache.storm.serialization.DefaultKryoFactory, worker.gc.childopts=, nimbus.topology.validator=org.apache.storm.nimbus.DefaultTopologyValidator, nimbus.seeds=[localhost], nimbus.queue.size=100000, nimbus.cleanup.inbox.freq.secs=600, storm.blobstore.replication.factor=3, worker.heap.memory.mb=768, logviewer.max.sum.worker.logs.size.mb=4096, pacemaker.childopts=-Xmx1024m, ui.users=null, transactional.zookeeper.servers=null, supervisor.worker.timeout.secs=30, storm.zookeeper.auth.password=null, storm.blobstore.acl.validation.enabled=false, client.blobstore.class=org.apache.storm.blobstore.NimbusBlobStore, storm.thrift.socket.timeout.ms=600000, supervisor.childopts=-Xmx256m, topology.worker.max.heap.size.mb=768.0, ui.http.x-frame-options=DENY, backpressure.disruptor.high.watermark=0.9, ui.filter=null, ui.header.buffer.bytes=4096, topology.min.replication.count=1, topology.disruptor.wait.timeout.millis=1000, storm.nimbus.retry.intervalceiling.millis=60000, topology.trident.batch.emit.interval.millis=50, storm.auth.simple-acl.users=[], drpc.invocations.threads=64, java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib, ui.port=8080, storm.exhibitor.poll.uripath=/exhibitor/v1/cluster/list, storm.messaging.netty.transfer.batch.size=262144, logviewer.appender.name=A1, nimbus.thrift.max_buffer_size=1048576, storm.auth.simple-acl.users.commands=[], drpc.request.timeout.secs=600}
2017-07-18 15:07:03.115 o.a.s.d.s.Slot main [WARN] SLOT DESKTOP-PDE9HPE:1027 Starting in state EMPTY - assignment null
2017-07-18 15:07:03.115 o.a.s.d.s.Slot main [WARN] SLOT DESKTOP-PDE9HPE:1028 Starting in state EMPTY - assignment null
2017-07-18 15:07:03.115 o.a.s.d.s.Slot main [WARN] SLOT DESKTOP-PDE9HPE:1029 Starting in state EMPTY - assignment null
2017-07-18 15:07:03.115 o.a.s.l.AsyncLocalizer main [INFO] Cleaning up unused topologies in C:\Users\BigTone\AppData\Local\Temp\50d72755-68f7-4d97-86e8-a1e4c1035242\supervisor\stormdist
2017-07-18 15:07:03.115 o.a.s.d.s.Supervisor main [INFO] Starting supervisor with id 93ef51bb-5109-4f38-907f-495ccc7f552d at host DESKTOP-PDE9HPE.
2017-07-18 15:07:03.146 o.a.s.d.nimbus main [WARN] Topology submission exception. (topology name='topologies\wordcount') #error {
 :cause nil
 :via
 [{:type org.apache.storm.generated.InvalidTopologyException
   :message nil
   :at [org.apache.storm.daemon.nimbus$validate_topology_name_BANG_ invoke nimbus.clj 1320]}]
 :trace
 [[org.apache.storm.daemon.nimbus$validate_topology_name_BANG_ invoke nimbus.clj 1320]
  [org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__10782 submitTopologyWithOpts nimbus.clj 1643]
  [org.apache.storm.daemon.nimbus$mk_reified_nimbus$reify__10782 submitTopology nimbus.clj 1726]
  [sun.reflect.NativeMethodAccessorImpl invoke0 NativeMethodAccessorImpl.java -2]
  [sun.reflect.NativeMethodAccessorImpl invoke NativeMethodAccessorImpl.java 62]
  [sun.reflect.DelegatingMethodAccessorImpl invoke DelegatingMethodAccessorImpl.java 43]
  [java.lang.reflect.Method invoke Method.java 498]
  [clojure.lang.Reflector invokeMatchingMethod Reflector.java 93]
  [clojure.lang.Reflector invokeInstanceMethod Reflector.java 28]
  [org.apache.storm.testing$submit_local_topology invoke testing.clj 310]
  [org.apache.storm.LocalCluster$_submitTopology invoke LocalCluster.clj 49]
  [org.apache.storm.LocalCluster submitTopology nil -1]
  [org.apache.storm.flux.Flux runCli Flux.java 207]
  [org.apache.storm.flux.Flux main Flux.java 98]]}

所以我在tmp.yaml中将拓扑名改为“wordcount”。

storm jar D:\dev\wordcount\_build\wordcount-0.0.1-SNAPSHOT-standalone.jar org.apache.storm.flux.Flux --local --no-splash --sleep 9223372036854775807 c:\users\bigtone\appdata\local\temp\tmpwodnya.yaml

  02:23:26.894 [Thread-18-count_bolt2222222-executor[2 2]] ERROR org.apache.storm.util - Async loop died!
java.lang.RuntimeException: Error when launching multilang subprocess
Traceback (most recent call last):
  File "d:\dev\python27\lib\runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "d:\dev\python27\lib\runpy.py", line 72, in _run_code
    exec code in run_globals
  File "D:\dev\Python27\Scripts\streamparse_run.exe\__main__.py", line 9, in <module>
  File "d:\dev\python27\lib\site-packages\streamparse\run.py", line 45, in main
    cls(serializer=args.serializer).run()
  File "d:\dev\python27\lib\site-packages\pystorm\bolt.py", line 68, in __init__
    super(Bolt, self).__init__(*args,**kwargs)
  File "d:\dev\python27\lib\site-packages\pystorm\component.py", line 211, in __init__
    signal.signal(rdb_signal, remote_pdb_handler)
TypeError: an integer is required

我怎样才能解决这个问题?谁能帮我?
谢谢你提前给我小费!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题