我正在尝试从卡卡那里接收数据并进行处理。首先,我尝试了Kafka与Spark流。spark流媒体与Kafka制作人的合作很好localhost:9092". 所以我确信我本地电脑上的spark和docker的Kafka制作人合作得很好。
但是,当我尝试与KafkaApache梁,我得到一个错误。我验证了spark作业服务器是否在docker中成功运行。不过,我还是得到一个错误,如下所示。我感谢你的帮助。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from environment_variables import *
from apache_beam.io.kafka import ReadFromKafka
def run_pipeline():
options = PipelineOptions(runner='SparkRunner', job_endpoint = 'localhost:8099',environment_type='LOOPBACK')
options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=options) as p:
p | "kafka" >> beam.io.kafka.ReadFromKafka(consumer_config={"bootstrap.servers": 'localhost:9092'},topics=[TOPICS], expansion_service='localhost:8097')
p | "print" >> beam.Map(print)
run_pipeline()
错误:
Traceback (most recent call last):
File "X:/Git_repo/project_red/Beam_streaming/junk6.py", line 18, in <module>
run_pipeline()
File "X:/Git_repo/project_red/Beam_streaming/junk6.py", line 16, in run_pipeline
p | "kafka" >> beam.io.kafka.ReadFromKafka(consumer_config={"bootstrap.servers": 'localhost:9092'},topics=[TOPICS], expansion_service='localhost:8097')
File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\transforms\ptransform.py", line 1058, in __ror__
return self.transform.__ror__(pvalueish, self.label)
File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\transforms\ptransform.py", line 573, in __ror__
result = p.apply(self, pvalueish, label)
File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\pipeline.py", line 646, in apply
return self.apply(transform, pvalueish)
File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\pipeline.py", line 689, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\runners\runner.py", line 188, in apply
return m(transform, input, options)
File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\runners\runner.py", line 218, in apply_PTransform
return transform.expand(input)
File "X:\Git_repo\project_red\venv\lib\site-packages\apache_beam\transforms\external.py", line 316, in expand
response = service.Expand(request)
File "X:\Git_repo\project_red\venv\lib\site-packages\grpc\_channel.py", line 923, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "X:\Git_repo\project_red\venv\lib\site-packages\grpc\_channel.py", line 826, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string = "{"created":"@1615358925.981000000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":5397,"referenced_errors":[{"created":"@1615358925.981000000","description":"failed to connect to all addresses","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":398,"grpc_status":14}]}"
>
作业服务器在docker上运行
暂无答案!
目前还没有任何答案,快来回答吧!