spark结构化流kafka avro到hbase

woobm2wo  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(206)

正如spark structured streaming with hbase integration中所述,我对在structured streaming框架中向hbase写入数据很感兴趣。我从github克隆了shc代码,通过sync provider对其进行扩展,并尝试将记录写入hbase。我收到一个错误:“流媒体源的查询必须用writestream.start()执行”。我的python代码如下:

spark = SparkSession \
        .builder \
        .appName("SparkConsumer") \
        .getOrCreate()

    print 'read Avro schema from file: {}...'.format(schema_name)
    schema = avro.schema.parse(open(schema_name, 'rb').read())
    reader = avro.io.DatumReader(schema)
    print 'the schema is read'

    rows = spark \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', brokers) \
        .option('subscribe', topic) \
        .option('group.id', group_id) \
        .option('maxOffsetsPerTrigger', 1000) \
        .option("startingOffsets", "earliest") \
        .load()
    rows.printSchema()

    schema = StructType([ \
            StructField('consumer_id', StringType(), False), \
            StructField('audit_system_id', StringType(), False), \
            StructField('object_path', StringType(), True), \
            StructField('object_type', StringType(), False), \
            StructField('what_action', StringType(), False), \
            StructField('when', LongType(), False), \
            StructField('where', StringType(), False), \
            StructField('who', StringType(), True), \
            StructField('workstation', StringType(), True) \
        ])

    def decode_avro(msg):
        bytes_reader = io.BytesIO(bytes(msg))
        decoder = avro.io.BinaryDecoder(bytes_reader)
        data = reader.read(decoder)
        return (\
                data['consumer_id'],\
                data['audit_system_id'],\
                data['object_path'],\
                data['object_type'],\
                data['what_action'],\
                data['when'],\
                data['where'],\
                data['who'],\
                data['workstation']\
               )

    udf_decode_avro = udf(decode_avro, schema)

    values = rows.select('value')
    values.printSchema()

    changes = values.withColumn('change', udf_decode_avro(col('value'))).select('change.*')
    changes.printSchema()

    change_catalog = '''
    {
        "table":
        {
            "namespace": "uba_input",
            "name": "changes"
        },
        "rowkey": "consumer_id",
        "columns":
        {
            "consumer_id": {"cf": "rowkey", "col": "consumer_id", "type": "string"},
            "audit_system_id": {"cf": "data", "col": "audit_system_id", "type": "string"},
            "object_path": {"cf": "data", "col": "object_path", "type": "string"},
            "object_type": {"cf": "data", "col": "object_type", "type": "string"},
            "what_action": {"cf": "data", "col": "what_action", "type": "string"},
            "when": {"cf": "data", "col": "when", "type": "bigint"},
            "where": {"cf": "data", "col": "where", "type": "string"},
            "who": {"cf": "data", "col": "who", "type": "string"},
            "workstation": {"cf": "data", "col": "workstation", "type": "string"}
        }
    }'''

    query = changes \
        .writeStream \
        .outputMode("append") \
        .format('HBase.HBaseSinkProvider')\
        .option('hbasecat', change_catalog) \
        .option("checkpointLocation", '/tmp/checkpoint') \
        .start()

# .format('org.apache.spark.sql.execution.datasources.hbase')\

# query = changes \

# .writeStream \

# .format('console') \

# .start()

    query.awaitTermination()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题