java并发:多个作者,一个读者

4bbkushb  于 2023-05-05  发布在  Java
关注(0)|答案(9)|浏览(127)

我需要在我的软件中收集一些统计数据,我正在努力使它快速和正确,这并不容易(对我来说!)
到目前为止,我的第一个代码包含两个类,一个StatsService和一个StatsHarvester

public class StatsService
{
private Map<String, Long>   stats   = new HashMap<String, Long>(1000);

public void notify ( String key )
{
    Long value = 1l;
    synchronized (stats)
    {
        if (stats.containsKey(key))
        {
            value = stats.get(key) + 1;
        }
        stats.put(key, value);
    }
}

public Map<String, Long> getStats ( )
{
    Map<String, Long> copy;
    synchronized (stats)
    {
        copy = new HashMap<String, Long>(stats);
        stats.clear();
    }
    return copy;
}
}

这是我的第二个类,一个收集器,它不时地收集统计数据并将它们写入数据库。

public class StatsHarvester implements Runnable
{
private StatsService    statsService;
private Thread          t;

public void init ( )
{
    t = new Thread(this);
    t.start();
}

public synchronized void run ( )
{
    while (true)
    {
        try
        {
            wait(5 * 60 * 1000); // 5 minutes
            collectAndSave();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

private void collectAndSave ( )
{
    Map<String, Long> stats = statsService.getStats();
    // do something like:
    // saveRecords(stats);
}
}

在运行时,它将有大约30个并发运行的线程,每个线程调用notify(key)大约100次。只有一个StatsHarvester正在调用statsService.getStats()
所以我有很多作家,只有一个读者。如果有准确的统计数据就好了,但是我不在乎在高并发的情况下是否丢失了一些记录。
读者应该每5分钟或任何合理的时间运行一次。
写作应该尽可能快。阅读应该很快,但如果它锁定约300毫秒每5分钟,它的罚款。
我读过很多文档(Java并发实践,有效的Java等等),但我强烈地感觉到我需要你的建议来做好它。
我希望我能清楚而简短地说明我的问题,以获得有价值的帮助。

编辑

感谢所有人的详细和有益的回答。正如我所料,有不止一种方法可以做到这一点。
我测试了你的大部分提议(那些我理解的),并上传了一个测试项目到Google Code以供进一步参考(Maven项目)
http://code.google.com/p/javastats/
我已经测试了StatsService的不同实现

  • HashMapStatsService(HMSS)
  • ConcurrentHashMapStatsService(CHMSS)
  • LinkedQueueStatsService(LQSS)
  • GoogleStatsService(GSS)
  • ExecutorConcurrentHashMapStatsService(ECHMSS)
  • ExecutorHashMapStatsService(EHMSS)

我用x线程数测试了它们,每个线程调用y次,结果以毫秒为单位

10,100   10,1000  10,5000  50,100   50,1000  50,5000  100,100  100,1000 100,5000 
GSS       1        5        17       7        21       117      7        37       254       Summe: 466
ECHMSS    1        6        21       5        32       132      8        54       249       Summe: 508
HMSS      1        8        45       8        52       233      11       103      449       Summe: 910
EHMSS     1        5        24       7        31       113      8        67       235       Summe: 491
CHMSS     1        2        9        3        11       40       7        26       72        Summe: 171
LQSS      0        3        11       3        16       56       6        27       144       Summe: 266

现在我想我会使用ConcurrentHashMap,因为它提供了良好的性能,同时也很容易理解。
感谢您的所有输入!扬宁

2w3kk1z5

2w3kk1z51#

正如jack所回避的,您可以使用java.util.concurrent库,其中包括ConcurrentHashMap和AtomicLong。你可以把AtomicLong放进去,如果没有的话,你可以递增这个值。由于AtomicLong是线程安全的,您将能够递增变量而无需担心并发问题。

public void notify(String key) {
    AtomicLong value = stats.get(key);
    if (value == null) {
        value = stats.putIfAbsent(key, new AtomicLong(1));
    }
    if (value != null) {
        value.incrementAndGet();
    }
}

这应该是快速和线程安全的
编辑:稍微重构,所以最多只有两个查找。

1tu0hz3e

1tu0hz3e2#

为什么不使用java.util.concurrent.ConcurrentHashMap<K, V>?它在内部处理所有事情,避免了Map上无用的锁,为您节省了大量工作:你不必关心get和put的同步。
来自文档:
支持检索的完全并发性和可调整的更新预期并发性的哈希表。该类遵循与Hashtable相同的功能规范,并包含与Hashtable的每个方法对应的方法版本。然而,即使所有操作都是线程安全的,检索操作也不需要锁定,并且不支持以阻止所有访问的方式锁定整个表。
您可以指定其 * 并发级别 *:

更新操作之间允许的并发由可选concurrencyLevel构造函数参数(默认为16)指导,该参数用作内部调整大小的提示。该表在内部进行分区,以尝试允许指定数量的并发更新而不发生争用。由于哈希表中的位置基本上是随机的,因此实际的并发性会有所不同。理想情况下,您应该选择一个值,以容纳将同时修改表的线程数。使用明显高于需要的值会浪费空间和时间,而使用明显低于需要的值会导致线程争用。但在一个数量级内的高估和低估通常不会产生太大的明显影响。当已知只有一个线程将修改而所有其他线程将只读取时,值1是合适的。此外,调整此哈希表或任何其他类型的哈希表的大小是一个相对较慢的操作,因此,如果可能,最好在构造函数中提供预期表大小的估计值。

正如注解中所建议的,请仔细阅读ConcurrentHashMap的文档,特别是当它说明原子操作或非原子操作时。
为了保证原子性,你应该考虑哪些操作是原子的,从ConcurrentMap接口你会知道:

V putIfAbsent(K key, V value)
V replace(K key, V value)
boolean replace(K key,V oldValue, V newValue)
boolean remove(Object key, Object value)

可以安全使用。

mxg2im7a

mxg2im7a3#

我建议看看Java的util.concurrent库。我认为你可以更清楚地实现这个解决方案。我觉得你根本不需要Map。我建议使用ConcurrentLinkedQueue来实现。每个“生产者”都可以自由地写入这个队列,而不用担心其他人。它可以将一个对象放在队列中,并将其数据用于统计。
收集器可以消耗队列,不断地提取数据并处理它。然后它可以根据需要存储它。

xxhby3vn

xxhby3vn4#

克里斯·戴尔的回答看起来是一个很好的方法。
另一种选择是使用并发Multiset。在Google Collections library中有一个。您可以按如下方式使用它:

private Multiset<String> stats = ConcurrentHashMultiset.create();

public void notify ( String key )
{
    stats.add(key, 1);
}

看看source,这是使用ConcurrentHashMapputIfAbsent以及replace的三参数版本来实现的,以检测并发修改并重试。

qlckcl4x

qlckcl4x5#

解决这个问题的另一种方法是通过线程限制来利用(微不足道的)线程安全性。基本上创建一个单独的后台线程,负责阅读。它在可扩展性和简单性方面具有相当好的特性。
其思想是,不是所有线程都试图直接更新数据,而是它们产生一个“更新”任务供后台线程处理。同一个线程也可以执行读取任务,前提是处理更新时的一些延迟是可以容忍的。
这种设计非常好,因为线程将不再需要争夺锁来更新数据,并且由于Map仅限于单个线程,因此您可以简单地使用普通的HashMap来执行get/put等操作。就实现而言,这将意味着创建单线程执行器,并提交也可以执行可选的“collectAndSave”操作的写入任务。
代码草图可能如下所示:

public class StatsService {
    private ExecutorService executor = Executors.newSingleThreadExecutor();
    private final Map<String,Long> stats = new HashMap<String,Long>();

    public void notify(final String key) {
        Runnable r = new Runnable() {
            public void run() {
                Long value = stats.get(key);
                if (value == null) {
                    value = 1L;
                } else {
                    value++;
                }
                stats.put(key, value);
                // do the optional collectAndSave periodically
                if (timeToDoCollectAndSave()) {
                    collectAndSave();
                }
            }
        };
        executor.execute(r);
    }
}

有一个与执行器关联的BlockingQueue,每个为StatsService生成任务的线程都使用BlockingQueue。关键点是这样的:此操作的锁定持续时间应该比原始代码中的锁定持续时间短得多,因此争用应该少得多。总的来说,它应该会带来更好的吞吐量和延迟。
另一个好处是,由于只有一个线程读写Map,因此可以使用普通的HashMap和原始的long类型(不涉及ConcurrentHashMap或原子类型)。这也大大简化了实际处理它的代码。
希望能帮上忙。

hrysbysz

hrysbysz6#

你看过ScheduledThreadPoolExecutor吗?您可以使用它来调度编写器,这些编写器都可以写入并发集合,例如@Chris Dail提到的ConcurrentLinkedQueue。您可以在必要时从Queue中读取一个单独的调度作业,Java SDK应该处理几乎所有的并发问题,而不需要手动锁定。

z0qdvdin

z0qdvdin7#

如果我们忽略获取部分而专注于写入,那么程序的主要瓶颈是统计数据被锁定在一个非常粗糙的粒度级别。如果两个线程想要更新不同的键,它们必须等待。
如果你提前知道键的集合,并且可以预初始化Map,这样当更新线程到达时,键就可以保证存在,你就可以锁定accumulator变量而不是整个Map,或者使用线程安全的accumulator对象。
有一些map实现是专门为并发性设计的,可以为您实现更细粒度的锁定,而不是您自己实现。
但是需要注意的是统计数据,因为您需要在大致相同的时间获得所有累加器上的锁。如果使用现有的并发友好Map,则可能存在用于获取快照的构造。

eivgtgni

eivgtgni8#

使用ReentranReadWriteLock实现这两种方法的另一种选择。如果您需要清除计数器,则此实现可防止getStats方法中的争用条件。它还从getStats中删除了可变的AtomicLong,并使用了不可变的Long。

public class StatsService {

    private final Map<String, AtomicLong> stats = new HashMap<String, AtomicLong>(1000);
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();

    public void  notify(final String key) {
        r.lock();
        AtomicLong count = stats.get(key);
        if (count == null) {
            r.unlock();
            w.lock();
            count = stats.get(key);
            if(count == null) { 
                count = new AtomicLong();
                stats.put(key, count);
            }
            r.lock();
            w.unlock();
        }
        count.incrementAndGet();
        r.unlock();
    }

    public Map<String, Long> getStats() {
        w.lock();

        Map<String, Long> copy = new HashMap<String, Long>();
        for(Entry<String,AtomicLong> entry : stats.entrySet() ){
                copy.put(entry.getKey(), entry.getValue().longValue());
        }
        stats.clear();
        w.unlock();

        return copy;
    }
}

我希望这对你有帮助,欢迎任何评论!

snvhrwxg

snvhrwxg9#

下面介绍如何在对被测线程的性能影响最小的情况下完成此操作。这是Java中可能的最快解决方案,而无需求助于特殊的硬件寄存器进行性能计数。
让每个线程独立于其他线程向某个stats对象输出其stats(即不同步)。使包含计数的字段可变,因此它是内存隔离的:

class Stats
{
   public volatile long count;
}

class SomeRunnable implements Runnable
{
   public void run()
   {
     doStuff();
     stats.count++;
   }
}

有另一个线程,持有所有Stats对象的引用,定期绕过它们,并将所有线程的计数相加:

public long accumulateStats()
{
   long count = previousCount;

   for (Stats stat : allStats)
   {
       count += stat.count;
   }

   long resultDelta = count - previousCount;
   previousCount = count;

   return resultDelta;
}

这个gatherer线程还需要添加一个sleep()(或其他一些节流器)。例如,它可以周期性地向控制台输出计数/秒,以便为您提供应用程序执行情况的“实时”视图。
这样可以尽可能地避免同步开销。
另一个要考虑的技巧是将Stats对象填充到128(或SandyBridge或更高版本上的256字节),以便在不同的缓存行上保持不同的线程计数,否则CPU上将存在缓存争用。
当只有一个线程读取和一个线程写入时,您不需要锁或原子,volatile就足够了。当stats读取器线程与正在测量的线程的CPU缓存行交互时,仍然会有一些线程争用。这是无法避免的,但这是对运行线程影响最小的方法;可能每秒读取一次或更少。

相关问题