Eugene Goldberg Eugene Goldberg - 3 months ago 12
Java Question

how to properly @Autowire data access object outside of spring boot controller?

I'm working on a Spring Boot app, which acts as a REST API, which receives JSON data, and persists it in a Postgres DB.
For that purpose, I have controllers, as well as a separate data access classes.
In my controllers, I'm using

private CusttableDao custtableDao;

to auto-wire my DAO class, which looks like this:

public class CusttableDao {

static final Logger LOG = LoggerFactory.getLogger(CusttableDao.class);

//EntityManagerFactory emfactory = Persistence.createEntityManagerFactory( "Eclipselink_JPA" );

public void update(Custtable custtable) {

public Custtable getById(Class<Custtable> class1, CusttableCompositeKey custtableCompositeKey) {
Custtable ct = entityManager.find(Custtable.class, custtableCompositeKey);
return entityManager.find(Custtable.class, custtableCompositeKey);

public void create(Custtable custtable) {

public void delete(Custtable custtable) {
if (entityManager.contains(custtable))

private EntityManager entityManager;

For the purposes of REST API service, this arrangement works well.

Recently, I've been asked to add a capability to receive data from Kafka.

For this purpose, I have constructed a ThreadPool and a Consumer classes:

public class ConsumerThreadPool {

private static final String TOPIC = "test5";
private static final Integer NUM_THREADS = 1;

private ConsumerConfigFactory consumerConfigFactory;

private ConsumerConnector consumer;
private ExecutorService threadPool;

public ConsumerThreadPool() {
threadPool = Executors.newFixedThreadPool(NUM_THREADS);

public void startConsuming() {
ConsumerConfig consumerConfig = consumerConfigFactory.getConsumerConfig();
consumer = createJavaConsumerConnector(consumerConfig);


public void consume() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, NUM_THREADS);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TOPIC);

int threadNumber = 0;
for (final KafkaStream<byte[], byte[]> stream : streams) {
threadPool.submit(new ErpConsumer(stream, threadNumber));

public class ErpConsumer implements Runnable {

private CusttableDao custtableDao;

private ObjectMapper objectMapper;
private KafkaStream<byte[], byte[]> kafkaStream;
private int threadNumber;

public ErpConsumer(KafkaStream<byte[], byte[]> kafkaStream, int threadNumber) {
this.threadNumber = threadNumber;
this.kafkaStream = kafkaStream;
this.objectMapper = new ObjectMapper();

ObjectMapper mapper = new ObjectMapper();

public void run() {
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();

while (it.hasNext()) {
byte[] messageData =;
try {
String msg = new String(messageData);

JSONArray jsonArray = new JSONArray(msg);

for (int i = 0; i < jsonArray.length(); i++) {
JSONObject custtableObject = jsonArray.getJSONObject(i);
System.out.print(msg + "\n");
} catch (Exception e) {

System.out.println("Shutting down Thread: " + kafkaStream);

private void dispatchRecord(JSONObject record) throws JSONException, JsonParseException, JsonMappingException, IOException{
String changeTableName = record.getString("upk_changedtablename");

switch (changeTableName) {
case "Custtable":
Custtable custTable = mapper.readValue(record.toString(), new TypeReference<Custtable>(){});

custTable.setPartition( Long.valueOf(record.getString("upk_partition")).longValue());

Long keyfieldrecid = custTable.getUpk_keyfieldrecid();
Long partition = custTable.getUpk_partition();

if(custTable.getOptype().equals("U")) {

Custtable customer = (Custtable) custtableDao.getById(Custtable.class,
new CusttableCompositeKey

BeanUtils.copyProperties(custTable, customer);

customer.setCusttableCompositeKey(new CusttableCompositeKey



Unlike it was with the Rest Controller, using

private CusttableDao custtableDao;

does not help - the custtableDao remains null here:

Custtable customer = (Custtable) custtableDao.getById(Custtable.class,
new CusttableCompositeKey

What is the proper way for me to auto-wite/instanciate/access my DAO in my Consumer, which is not of type RestController?


Your ErpConsumer class is not a managed bean you are instantiating it yourself in this line:

threadPool.submit(new ErpConsumer(stream, threadNumber));

@Autowired only works for beans that are managed by Spring.

Instead inject your dao to your ConsumerThreadPool which is a managed bean (because of the @Component annotation). Then add the dao as a parameter in the constructor of ErpConsumer and create instances of it this way:

threadPool.submit(new ErpConsumer(custtableDao,stream, threadNumber));

Here's how your constructor might look like:

public ErpConsumer(CusttableDao custtableDao,KafkaStream<byte[], byte[]> kafkaStream, int threadNumber) {
        this.threadNumber = threadNumber;
        this.kafkaStream = kafkaStream;
        this.objectMapper = new ObjectMapper();
        this.custtableDao = custtableDao;