我尝试向cassandra写入一些数据,建立了一个连接,这是我的数据库模式:
CREATE TABLE IF NOT EXISTS road_traffic (
road_id INT,
timestamp TIMESTAMP,
radar_id INT,
vehicles MAP<INT, FROZEN<UDTVehicle>>,
PRIMARY KEY (road_id, timestamp) // partition key and clustering key
);
CREATE TYPE IF NOT EXISTS UDTVehicle (
num_vehicles INT,
speed LIST<FLOAT>
);
这是我的Python代码:
def write_to_cassandra(self,session, record):
insert_query = """
INSERT INTO road_traffic (road_id, timestamp, radar_id, vehicles)
VALUES (?, ?, ?, ?);
"""
prepared_insert = session.prepare(insert_query)
batch = BatchStatement()
batch_size = 0
batches= 0
print(record)
if not all(record.get(field) is not None for field in list(record.keys())) :
logger.warning("Record is missing a required fields")
return
else :
vehicles_map = {record.get('road_id'): record.get('Vehicles')}
print(record.get('road_id'))
batch.add(prepared_insert, (record.get('road_id'), record.get('timestamp'), record.get('radar_id'), vehicles_map))
batch_size += 1
据我所知,vehicles_map必须是一个键(分区键)和一个值(UDTVehicle对象)。当我打印记录时,我得到了这个:
{'timestamp': datetime.datetime(2022, 1, 1, 8, 0, 30), 'num_Vehicles': 7, 'road_id': 9, 'radar_id': 11, 'Vehicles': {'num_Vehicles': 7, 'speed': [72.78, 62.67, 85.15, 75.51, 83.95, 76.39, 57.92]}}
我得到的错误如下:
Traceback (most recent call last):
File "cassandra_kafka/ingest_cassandra.py", line 135, in <module>
Data_ingest.consume_store()
File "cassandra_kafka/ingest_cassandra.py", line 113, in consume_store
self.write_to_cassandra(session, value)
File "cassandra_kafka/ingest_cassandra.py", line 66, in write_to_cassandra
batch.add(prepared_insert, (record.get('road_id'), record.get('timestamp'), record.get('radar_id'), vehicles_map))
File "cassandra/query.py", line 827, in cassandra.query.BatchStatement.add
File "cassandra/query.py", line 506, in cassandra.query.PreparedStatement.bind
File "cassandra/query.py", line 636, in cassandra.query.BoundStatement.bind
File "cassandra/cqltypes.py", line 799, in cassandra.cqltypes._ParameterizedType.serialize
File "cassandra/cqltypes.py", line 909, in cassandra.cqltypes.MapType.serialize_safe
File "cassandra/cqltypes.py", line 324, in cassandra.cqltypes._CassandraType.to_binary
File "cassandra/cqltypes.py", line 799, in cassandra.cqltypes._ParameterizedType.serialize
File "cassandra/cqltypes.py", line 1030, in cassandra.cqltypes.UserType.serialize_safe
KeyError: 0
我做错了什么?任何帮助将不胜感激。
1条答案
按热度按时间13z8s7eq1#
在Python驱动程序中插入包含UDT的行的正确方法是使用与UDT具有相同结构的简单类,并在为insert语句创建值时使用它。
我准备了一个简单的代码来演示我的意思:检查
UdtVehicle
类以及在批处理中为insert语句创建参数时如何示例化它。根据传递的命令行arg,示例代码将继续演示其他一些内容。
read
显示了当“只是”按原样阅读行时得到的结果,read_udt
显示了如何在集群中注册UDT,并将返回的行很好地转换为Python类,insert
是对单行的健全性检查(=非批处理)准备好的插入语句(UDT按照上面的解释正确处理),而insertb
在批处理中验证了前一个case的使用。有关处理UDT的更多信息,请查看此页面:https://docs.datastax.com/en/developer/python-driver/latest/user_defined_types。请注意,对于 * prepared * 语句,您需要稍微不同的方法(无论如何,您可能希望在生产应用程序中使用prepared语句)。
查看上面的代码,您可能希望在您发布的函数中将车辆转换为UDT(取决于您通过Kafka流接收的确切内容)。请注意,只有显式调用
session.execute(batch)
(代码中没有显示)后,才会执行批处理。另一对夫妇的意见,为您的意识:
1.不要在每次写操作时都准备语句:这是一个消耗资源的反模式。一旦准备好语句,就将其缓存在某处(
self.prepared_statement
是一个自然的候选者),然后使用它以获得更好的性能1.你构造的UDT不能保证
num_vehicles == len(speed)
。如果应该强制执行,可能不同的模型会更好(但这又取决于您的用例)1.在这种情况下,要格外小心地评估你是否真的需要一批。Cassandra中的批处理不是一种加速不分青红皂白的批量插入的方法!为此,您可以只发出一些并发的简单写入,驱动程序将负责其余的工作。只要给定批处理中的语句涉及不同的分区(在您的示例中,是不同的
road_id
值),那么可能要避免单个批处理。在此阅读更多信息:https://batey.info/cassandra-anti-pattern-cassandra-logged.html,https://www.batey.info/cassandra-anti-pattern-misuse-of.html .下面是你可以开始的示例代码。(在Cassandra 4.1上测试)