datetime—如何使用mapreduce和pyspark查找某一年中某一天的频率

pbossiut  于 2021-05-29  发布在  Hadoop
关注(0)|答案(3)|浏览(337)

我有一个文本文件(61gb),每行包含一个表示日期的字符串,例如thu dec 16 18:53:32+0000 2010
在单个核心上迭代文件会花费太长时间,因此我想使用pyspark和mapreduce技术快速查找某一年中某一天的行频率。
我认为这是一个好的开始:

import dateutil.parser
text_file = sc.textFile('dates.txt')
date_freqs = text_file.map(lambda line: dateutil.parser.parse(line)) \
        .map(lambda date: date + 1) \
        .reduceByKey(lambda a, b: a + b)

不幸的是,我不明白如何过滤某一年,并减少关键。关键是今天。
输出示例:
2014年12月16日星期四
12月17日星期四345等。

u3r8eeie

u3r8eeie1#

我应该补充一点,dateutil在python中不是标准的。如果集群上没有sudo权限,这可能会带来问题。作为解决方案,我建议使用datetime:

import datetime
def parse_line(d):
    f = "%a %b %d %X %Y"
    date_list = d.split()
    date = date_list[:4]
    date.append(date_list[5])
    date = ' '.join(date)
    return datetime.datetime.strptime(date, f)

counts = rdd.map(parse_line)\
    .map(attrgetter('year', 'month', 'day'))\
    .filter(lambda (y, m, d): y == 2015)\
    .countByValue()

我对更好的解决方案感兴趣使用:Parquet地板,行/列等。

rjee0c15

rjee0c152#

类似这样的事情也许是一个好的开始:

import dateutil.parser
text_file = sc.textFile('dates.txt')
date_freqs = text_file.map(lambda line: dateutil.parser.parse(line))
    .keyBy((_.year, _.month, _.day)) // somehow get the year, month, day to key by
    .countByKey()
v7pvogib

v7pvogib3#

在另一个回答中提到, dateutil.parser.parse 返回具有 year , month ,和 day 属性:

>>> dt = dateutil.parser.parse('Thu Dec 16 18:53:32 +0000 2010')
>>> dt.year
2010
>>> dt.month
12
>>> dt.day
16

从rdd开始:

>>> rdd = sc.parallelize([
...     'Thu Oct 21 5:12:38 +0000 2010',
...     'Thu Oct 21 4:12:38 +0000 2010',
...     'Wed Sep 22 15:46:40 +0000 2010',
...     'Sun Sep 4 22:28:48 +0000 2011',
...     'Sun Sep 4 21:28:48 +0000 2011'])

以下是如何获得所有年-月-日组合的计数:

>>> from operator import attrgetter
>>> counts = rdd.map(dateutil.parser.parse).map(
...     attrgetter('year', 'month', 'day')).countByValue()
>>> counts
defaultdict(<type 'int'>, {(2010, 9, 22): 1, (2010, 10, 21): 2, (2011, 9, 4): 2})

要获得所需的输出:

>>> for k, v in counts.iteritems():
...     print datetime.datetime(*k).strftime('%a %b %y'), v
...
Wed Sep 10 1
Thu Oct 10 2
Sun Sep 11 2

如果只想统计某一年,可以在进行统计之前过滤rdd:

>>> counts = rdd.map(dateutil.parser.parse).map(
...    attrgetter('year', 'month', 'day')).filter(
...    lambda (y, m, d): y == 2010).countByValue()
>>> counts
defaultdict(<type 'int'>, {(2010, 9, 22): 1, (2010, 10, 21): 2})

相关问题