为什么我在cassandra数据库中的数据插入有时是稳定的,有时是缓慢的?

bq8i3lrv  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(419)

如果当前数据id在cassandra数据库中存在或不存在,这是我的查询:

row = session.execute("SELECT * FROM articles where id = %s", [id])

解析kafka中的消息,然后确定此消息是否存在于cassandra数据库中如果不存在,则应执行插入操作,如果确实存在,则不应将其插入数据中。

messages = consumer.get_messages(count=25)

if len(messages) == 0:
    print 'IDLE'
    sleep(1)
    continue

for message in messages:
    try:
        message = json.loads(message.message.value)
        data = message['data']
        if data:
            for article in data:
                source = article['source']
                id = article['id']
                title = article['title']
                thumbnail = article['thumbnail']
                #url = article['url']
                text = article['text']
                print article['created_at'],type(article['created_at'])
                created_at = parse(article['created_at'])
                last_crawled = article['last_crawled']
                channel = article['channel']#userid
                category = article['category']
                #scheduled_for = created_at.replace(minute=created_at.minute + 5, second=0, microsecond=0)
                scheduled_for=(datetime.utcnow() + timedelta(minutes=5)).replace(second=0, microsecond=0)
                row = session.execute("SELECT * FROM articles where id = %s", [id])
                if len(list(row))==0:
                #id parse base62
                    ids = [id[0:2],id[2:9],id[9:16]]
                    idstr=''
                    for argv in ids:
                        num = int(argv)
                        idstr=idstr+encode(num)
                    url='http://weibo.com/%s/%s?type=comment' % (channel,idstr)
                    session.execute("INSERT INTO articles(source, id, title,thumbnail, url, text, created_at, last_crawled,channel,category) VALUES (%s,%s, %s, %s, %s, %s, %s, %s, %s, %s)", (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category))
                    session.execute("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (%s, %s, %s,%s) USING TTL 86400", (source,'article', scheduled_for, id))
                    log.info('%s %s %s %s %s %s %s %s %s %s' % (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category))

    except Exception, e:
        log.exception(e)
        #log.info('error %s %s' % (message['url'],body))
        print e
        continue

我有一个id,它只有一个唯一的表行,我希望这样。一旦我为唯一id添加不同的预定时间,我的系统就会崩溃。添加此 if len(list(row))==0: 是正确的想法,但我的系统是非常缓慢之后。
这是我的表格说明:

DROP TABLE IF EXISTS schedules;

CREATE TABLE schedules (
 source text,
 type text,
 scheduled_for timestamp,
 id text,
 PRIMARY KEY (source, type, scheduled_for, id)
);

此计划的\u是可更改的。这里还有一个具体的例子:

Hao article 2016-01-12 02:09:00+0800 3930462206848285
Hao article 2016-01-12 03:09:00+0801 3930462206848285
Hao article 2016-01-12 04:09:00+0802 3930462206848285
Hao article 2016-01-12 05:09:00+0803 3930462206848285

以下是我的文章cql模式:

CREATE TABLE crawler.articles (
    source text,
    created_at timestamp,
    id text,
    category text,
    channel text,
    last_crawled timestamp,
    text text,
    thumbnail text,
    title text,
    url text,
    PRIMARY KEY (source, created_at, id)
) WITH CLUSTERING ORDER BY (created_at DESC, id ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"ALL"}'
AND comment = ''
AND compaction = {'sstable_size_in_mb': '160', 'enabled': 'true', 'unchecked_tombstone_compaction': 'false', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 604800
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';

CREATE INDEX articles_id_idx ON crawler.articles (id);
CREATE INDEX articles_url_idx ON crawler.articles (url);
rjee0c15

rjee0c151#

看看您的模式和使用它的方式,我可以假设id字段上的二级索引正在产生问题并减慢查询速度。您可以在许多地方查看为什么二级索引不好的更多细节,只需在google上搜索即可(这个源代码是一个很好的开始,也是datastax文档页)。基本上,当您在5节点集群中使用辅助索引时,您必须点击每个节点以找到您要查找的项目,并且当使用主键时,每个节点都知道哪个节点保存数据。
如果使用基数较高的数据(添加更多项时性能会下降),并且使用每个项目都不同的id,则二级索引尤其糟糕。当您使用低基数(如按星期几索引某些数据)时(您知道一周只有7天,因此您可以预测索引表的大小),或者在您的情况下使用类别(如果类别数量有限)时,它们是可以的。
我建议再创建一个表, article_by_id 它将是文章表的反向索引。您可以使用轻量级事务 INSERT ... IF NOT EXISTS 首先返回该表,如果操作返回 true (意思是插入过程中,记录以前不存在)您可以定期插入 articles 如果它返回 false (意味着数据已存在,因此未插入)可以跳过插入 articles table。
下面是一个表(我建议使用uuid而不是text作为id,但我是基于您的文章表创建的表):

CREATE TABLE article_by_id (
    id text,
    source text,
    created_at timestamp,
    PRIMARY KEY (id)
) WITH comment = 'Article by id.';

通过这种方式,您可以始终根据id找到键的所有部分。如果id是您的输入参数,则从该表中选择将为您提供源和创建位置。
下面是insert查询,它将返回true或false:

INSERT INTO article_by_id(id, source, created_at) VALUES (%s,%s, %s) IF NOT EXISTS;

还有更多提示,如果您可以根据实体中的一些不可更改的数据来查找键,则不需要第二个表。在本例中,如果source和created的\u at唯一地标识系统中的项目并且从不更改,则可以删除id并使用原始表。

相关问题