使用concurrenthashmap的java线程安全

nbewdwxp  于 2021-07-09  发布在  Java
关注(0)|答案(5)|浏览(428)

我有下面的课。我使用concurrenthashmap。我有许多线程写入Map和一个计时器,保存在Map上的数据每5分钟。我在Map中写入条目时使用putifabsent()来实现线程安全。但是,当我从中读取并通过clear()方法删除所有条目时,我不希望在读取Map内容然后删除它们的过程中有其他线程写入Map。显然,即使使用synchronized(lock){},我的代码也不是线程安全的,b/c在saveentries()中拥有锁的线程不一定是在log()方法中写入Map的同一个线程!除非我用同一个lock对象在log()中锁定整个代码!
我想知道有没有其他方法可以在没有外部锁的情况下实现线程安全?非常感谢您的帮助。

public class Logging {

private static Logging instance;    
private static final String vendor1 = "vendor1";
private static final String vendor2 = "vendor2";    
private static long delay = 5 * 60 * 1000;

private ConcurrentMap<String, Event> vendor1Calls = new ConcurrentHashMap<String, Event>();
private ConcurrentMap<String, Event> vendor2Calls = new ConcurrentHashMap<String, Event>();

private Timer timer;    
private final Object lock = new Object();

private Logging(){
    timer = new Timer();                
    timer.schedule(new TimerTask() {
        public void run() {
            try {
                saveEntries();
            } catch (Throwable t) {
                timer.cancel();
                timer.purge();
            }
        }       
    }, 0, delay);
}

public static synchronized Logging getInstance(){     
    if (instance == null){
        instance = new Logging();
    }
    return instance;
 }

public void log(){      
    ConcurrentMap<String, Event> map;
    String key = "";        

    if (vendor1.equalsIgnoreCase(engine)){
        map = vendor1Calls;
    }else if(vendor2.equalsIgnoreCase(engine)){  
        map = vendor2Calls;
    }else{
        return;
    }       

    key = service + "." + method;
// It would be the code if I use a regular HashMap instead of ConcurrentHashMap
    /*Event event = map.get(key);       

    // Map does not contain this service.method, create an Event for the first     time.
    if(event == null){
        event = new Event();            
        map.put(key, event);

        // Map already contains this key, just adjust the numbers.
    }else{
        // Modify the object fields
    }*/
    //}

    // Make it thread-safe using CHM
    Event newEvent = new Event();
    Event existingEvent= map.putIfAbsent(key, newEvent); 

    if(existingEvent!=null && existingEvent!=newEvent){
        // Modify the object fields
}       

private void saveEntries(){

    Map<String, List<Event>> engineCalls = null;
    try {           

        engineCalls = new HashMap<String, List<Event>>();
        List<Event> events = null;

// How can I achieve therad safety here w/o applying any lock?
        //synchronized(lock){
            if(!vendor1Calls.isEmpty()){
                events = new ArrayList<Event>();
                events.addAll(vendor1Calls.values());
                engineCalls.put(vendor1, events);
                vendor1Calls.clear();
            }
            if(!vendor2Calls.isEmpty()){
                events = new ArrayList<Event>();
                events.addAll(vendor2Calls.values());
                engineCalls.put(vendor2, events);
                vendor2Calls.clear();
            }
        //}

// logICalls() saves the events in the DB.          
        DBHandle.logCalls(engineCalls);
    } catch (Throwable t) {         
    } finally {
        if(engineCalls!=null){
            engineCalls.clear();
        }                       
    }   
}

}

t5fffqht

t5fffqht1#

下面的代码使用了来自FunctionalJava项目的持久Map。它使用更多的内存,但是(afaik:)可以由多个线程安全地使用。中唯一可变的值 AtomicReference 并使用比较集进行更新。Map和事件是不可变的,因此是线程安全的。另外,我没有清除Map,而是替换对它的引用。

import fj.F;
import fj.Ord;
import fj.data.TreeMap;

import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

public class Logging
{
    // Event is immutable
    private static class Event
    {
        // updates are done by creating new values
        Event update(String key)
        {
            return new Event();
        }
    }

    // Refactored code pertaining to one vendor into a separate class.
    private static class EngineLogger
    {
        public final String vendor;
        private final AtomicReference<TreeMap<String, Event>> mapRef =
                new AtomicReference<TreeMap<String, Event>>(TreeMap.<String, Event>empty(Ord.stringOrd));

        private EngineLogger(String vendor)
        {
            this.vendor = vendor;
        }

        public void log(String service, String method)
        {
            final String key = service + "." + method;
            boolean updated = false;
            while (! updated)
            {
                // get the current value of the map
                TreeMap<String, Event> currMap = mapRef.get();

                // create an updated value of the map, which is the current map plus info about the new key
                TreeMap<String, Event> newMap = currMap.update(key, new F<Event, Event>()
                {
                    @Override
                    public Event f(Event event)
                    {
                        // Modify the object fields of event, if the map contains the key
                        return event.update(key);
                    }
                    // create a new event if the map does not contain the key
                }, new Event());

                // compare-and-set the new value in .. repeat until this succeeds
                updated = mapRef.compareAndSet(currMap, newMap);
            }
        }

        public List<Event> reset()
        {
            /* replace the reference with a new map */
            TreeMap<String, Event> oldMap = mapRef.getAndSet(TreeMap.<String, Event>empty(Ord.stringOrd));

            /* use the old map to generate the list */
            return new ArrayList<Event>(oldMap.toMutableMap().values());
        }
    }

    private static Logging instance;
    private static long delay = 5 * 60 * 1000;
    private final Timer timer;

    private final EngineLogger vendor1 = new EngineLogger("vendor1");
    private final EngineLogger vendor2 = new EngineLogger("vendor2");

    private Logging()
    {
        timer = new Timer();
        timer.schedule(new TimerTask()
        {
            public void run()
            {
                try
                {
                    saveEntries();
                }
                catch (Throwable t)
                {
                    timer.cancel();
                    timer.purge();
                }
            }
        }, 0, delay);
    }

    public static synchronized Logging getInstance()
    {
        if (instance == null)
        {
            instance = new Logging();
        }
        return instance;
    }

    public void log(String engine, String service, String method)
    {
        if (vendor1.vendor.equals(engine))
        {
            vendor1.log(service, method);
        }
        else if (vendor2.vendor.equals(engine))
        {
            vendor2.log(service, method);
        }
    }

    private void saveEntries()
    {
        Map<String, List<Event>> engineCalls = new HashMap<String, List<Event>>();
        engineCalls.put(vendor1.vendor, vendor1.reset());
        engineCalls.put(vendor2.vendor, vendor2.reset());
        DBHandle.logCalls(engineCalls);
    }
}
swvgeqrz

swvgeqrz2#

但是,当我从中读取并通过clear()方法删除所有条目时,我不希望在读取Map内容然后删除它们的过程中有其他线程写入Map。
我想你想说的是你并不在乎严格锁定Map。相反,您只关心vender1calls.values()和vendor1calls.clear()之间日志条目的丢失,对吗?
在这种情况下,我可以想象你可以取代

events.addAll(vendor1Calls.values());
vendor1Calls.clear();

在saveentries中:

for (Iterator<Event> iter = vendor1Calls.values().iterator(); iter.hasNext(); ) {
    Event e = iter.next();
    events.add(e);
    iter.remove();
}

这样,您只删除已添加到事件列表中的事件。当saveentries()仍在执行时,仍然可以写入vendor1callsMap,但迭代器会跳过添加的值。

js4nwp54

js4nwp543#

我最好的建议是使用readwritelock,但是您特别声明您不想使用任何锁(顺便说一句) ConcurrentHashMap 你可以尝试以下方法。
对每个Map使用原子引用,当需要记录其内容时,使用getandset将旧Map替换为全新的空Map。
您现在有了一个独占使用的Map,您可以随意遍历和清除它。不幸的是,有一个小问题(使用锁可以解决这个问题),那就是当您用一个空的线程交换Map时,另一个线程正在添加到Map中。您也许可以在此时添加一个延迟,希望等待足够长的时间让另一个线程完成它正在做的事情。也许有一个 ConcurrentHashMap 你可以使用等待,直到每个人都完成它。

kulphzqa

kulphzqa4#

没有任何外部同步,chm就无法实现这一点。返回的迭代器视图是弱一致的,这意味着在实际对Map进行迭代时,Map的内容可能会发生更改。
看来你需要使用 Collections.synchronizedMap 以获得所需的功能。
编辑以使我的观点更清楚:
为了达到这个目的 synchronizedMap 你首先得 synchronize 在Map上,然后您可以迭代或复制内容到另一个Map,然后清除。

Map map = Collections.synchronizedMap(new HashMap());

public void work(){
  Map local = new HashMap();
  synchronized(map){
     local.putAll(map);
     map.clear();
  }
  //do work on local instance 
}

而不是 local 例如,正如我提到的,你可以 iterate + remove 类似于@kevin jin的答案。

vwkv1x7d

vwkv1x7d5#

此线程中显示了此示例的原子版本(仅使用concurrentmap中的功能)。

相关问题