david david - 15 days ago 6
Java Question

initialize all the metadata at once

I have a Singleton class which connects to Cassandra. I want to initialize

processMetadata
,
procMetadata
and
topicMetadata
all at once not one by one. If they gets initialize all at once then I will see consistent values from all those three not different values for either of them.

In the below code,
processMetadata
,
procMetadata
and
topicMetadata
is initialized for the first time inside
initializeMetadata
method and then it gets updated every 15 minutes.

public class CassUtil {
private static final Logger LOGGER = Logger.getInstance(CassUtil.class);
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

// below are my three metedata which I need to update all three at once not one by one
private List<ProcessMetadata> processMetadata = new ArrayList<>();
private List<ProcMetadata> procMetadata = new ArrayList<>();
private List<String> topicMetadata = new ArrayList<>();

private Session session;
private Cluster cluster;

private static class Holder {
private static final CassUtil INSTANCE = new CassUtil();
}

public static CassUtil getInstance() {
return Holder.INSTANCE;
}

private CassUtil() {
List<String> servers = TestUtils.HOSTNAMES;
String username = TestUtils.USERNAME;
String password = TestUtils.PASSWORD;

PoolingOptions opts = new PoolingOptions();
opts.setCoreConnectionsPerHost(HostDistance.LOCAL,
opts.getCoreConnectionsPerHost(HostDistance.LOCAL));

Builder builder = Cluster.builder();
cluster =
builder
.addContactPoints(servers.toArray(new String[servers.size()]))
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
.withPoolingOptions(opts)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc(
!TestUtils.isProduction() ? "DC2" : TestUtils.getCurrentLocation()
.get().name().toLowerCase()).build())
.withCredentials(username, password).build();

try {
session = cluster.connect("testkeyspace");
} catch (NoHostAvailableException ex) {
LOGGER.logError("error= ", ExceptionUtils.getStackTrace(ex));
} catch (Exception ex) {
LOGGER.logError("error= " + ExceptionUtils.getStackTrace(ex));
}
}

// start a background thread which runs every 15 minutes
public void startScheduleTask() {
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
processMetadata = processMetadata(true);
topicMetadata = listOfTopic(TestUtils.GROUP_ID);
procMetadata = procMetadata();
} catch (Exception ex) {
LOGGER.logError("error= ", ExceptionUtils.getStackTrace(ex));
}
}
}, 0, 15, TimeUnit.MINUTES);
}

// called from main thread to initialize the metadata
// and start the background thread where it gets updated
// every 15 minutes
public void initializeMetadata() {
processMetadata = processMetadata(true);
topicMetadata = listOfTopic(TestUtils.GROUP_ID);
procMetadata = procMetadata();
startScheduleTask();
}

private List<String> listOfTopic(final String consumerName) {
List<String> listOfTopics = new ArrayList<>();
String sql = "select topics from topic_metadata where id=1 and consumerName=?";
try {
// get data from cassandra
} catch (Exception ex) {
LOGGER.logError("error= ", ExceptionUtils.getStackTrace(ex), ", Consumer Name= ",
consumerName);
}
return listOfTopics;
}

private List<ProcessMetadata> processMetadata(final boolean flag) {
List<ProcessMetadata> metadatas = new ArrayList<>();
String sql = "select * from process_metadata where id=1 and is_active=?";
try {
// get data from cassandra
} catch (Exception ex) {
LOGGER.logError("error= ", ExceptionUtils.getStackTrace(ex), ", active= ", flag);
}
return metadatas;
}

private List<ProcMetadata> procMetadata() {
List<ProcMetadata> metadatas = new ArrayList<>();
String sql = "select * from schema where id=1";
try {
// get data from cassandra
} catch (SchemaParseException ex) {
LOGGER.logError("schema parsing error= ", ExceptionUtils.getStackTrace(ex));
} catch (Exception ex) {
LOGGER.logError("error= ", ExceptionUtils.getStackTrace(ex));
}
return metadatas;
}

public List<ProcessMetadata> getProcessMetadata() {
return processMetadata;
}

public List<String> getTopicMetadata() {
return topicMetadata;
}

public List<ProcMetadata> getProcMetadata() {
return procMetadata;
}
}


So from my main thread, I call
initializeMetadata
method only once which initializes those three metadata and then it starts a background thread which updates them every 15 minutes. Afer that I was using them like below from my multiple threads:

CassUtil.getInstance().getProcessMetadata();
CassUtil.getInstance().getTopicMetadata();
CassUtil.getInstance().getProcMetadata();


Problem Statement:-

Now I want to see same state of
processMetadata
,
topicMetadata
and
procMetadata
. Meaning these three metadata should be updated at same time not one after other bcoz I don't want to see mix state value for them after I do get on them.

How can I avoid this issue? Do I need to create another class which will hold these three metadata as constructor parameter?

Answer

The most efficient way to keep a consistent state of your lists can be to use an immutable class that will hold your 3 lists, you will then have a field of this type in your class that you will define volatile to make sure that all threads see the latest update of this field.

Here is for example the immutable class that we use to hold the state of the lists (it could be an ordinary class but as it is implementation specific it could be a static inner class):

private static class State {
    private final List<ProcessMetadata> processMetadata;
    private final List<ProcMetadata> procMetadata;
    private final List<String> topicMetadata;

    public State(final List<ProcessMetadata> processMetadata, 
        final List<ProcMetadata> procMetadata, final List<String> topicMetadata) {
        this.processMetadata = new ArrayList<>(processMetadata);
        this.procMetadata = new ArrayList<>(procMetadata);
        this.topicMetadata = new ArrayList<>(topicMetadata);
    }
    // Getters
}

Then your class would be something like that:

public class CassUtil {
    ...
    private volatile State state = new State(
        new ArrayList<>(), new ArrayList<>(), new ArrayList<>()
    );
    ...
    public void startScheduleTask() {
        ...
                    this.state = new State(
                        processMetadata(true), listOfTopic(TestUtils.GROUP_ID), 
                        procMetadata()
                    );
        ...
    }
    ...
    public void initializeMetadata() {
        this.state = new State(
            processMetadata(true), listOfTopic(TestUtils.GROUP_ID), procMetadata()
        );
        startScheduleTask();
    }
    ...

    public List<ProcessMetadata> getProcessMetadata() {
        return this.state.getProcessMetadata();
    }

    public List<String> getTopicMetadata() {
        return this.state.getTopicMetadata();
    }

    public List<ProcMetadata> getProcMetadata() {
        return this.state.getProcMetadata();
    }