dan-mi-sun dan-mi-sun - 3 months ago 26
Scala Question

ERROR: Phantom-dsl BatchQuery Unspecified with Overloaded method

I am attempting to extend my application to include another Cassandra table for storing the Transactions included in each Block.

I have tried to keep the code snippets succinct and relevant. If there is further code context required - just let me know.


phantomVersion = "1.22.0"
cassandraVersion = "2.1.4"





I am getting the following compilation error with the code listed below. Insights greatly appreciated.

[error] /home/dan/projects/open-blockchain/scanner/src/main/scala/org/dyne/danielsan/openblockchain/data/database/Database.scala:30: overloaded method value add with alternatives:
[error] (batch: com.websudos.phantom.batch.BatchQuery[_])com.websudos.phantom.batch.BatchQuery[com.websudos.phantom.builder.Unspecified] <and>
[error] (queries: Iterator[com.websudos.phantom.builder.query.Batchable with com.websudos.phantom.builder.query.ExecutableStatement])(implicit session: com.datastax.driver.core.Session)com.websudos.phantom.batch.BatchQuery[com.websudos.phantom.builder.Unspecified] <and>
[error] (queries: com.websudos.phantom.builder.query.Batchable with com.websudos.phantom.builder.query.ExecutableStatement*)(implicit session: com.datastax.driver.core.Session)com.websudos.phantom.batch.BatchQuery[com.websudos.phantom.builder.Unspecified] <and>
[error] (query: com.websudos.phantom.builder.query.Batchable with com.websudos.phantom.builder.query.ExecutableStatement)(implicit session: com.datastax.driver.core.Session)com.websudos.phantom.batch.BatchQuery[com.websudos.phantom.builder.Unspecified]
[error] cannot be applied to (scala.concurrent.Future[com.datastax.driver.core.ResultSet])
[error] .add(ChainDatabase.bt.insertNewBlockTransaction(bt))
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 6 s, completed Aug 9, 2016 2:42:30 PM





GenericBlockModel.scala:

case class BlockTransaction(hash: String, txid: String)

sealed class BlockTransactionModel extends CassandraTable[BlockTransactionModel, BlockTransaction] {

override def fromRow(r: Row): BlockTransaction = {
BlockTransaction(
hash(r),
txid(r)
)
}

object hash extends StringColumn(this) with PartitionKey[String]

object txid extends StringColumn(this) with ClusteringOrder[String] with Descending

}

abstract class ConcreteBlockTransactionModel extends BlockTransactionModel with RootConnector {

override val tableName = "block_transactions"

def insertNewBlockTransaction(bt: BlockTransaction): Future[ResultSet] = insertNewRecord(bt).future()

def insertNewRecord(bt: BlockTransaction) = {
insert
.value(_.hash, bt.hash)
.value(_.txid, bt.txid)
}
}





Database.scala

class Database(val keyspace: KeySpaceDef) extends DatabaseImpl(keyspace) {

def insertBlock(block: Block) = {
Batch.logged
.add(ChainDatabase.block.insertNewRecord(block))
.future()
}

def insertTransaction(tx: Transaction) = {
Batch.logged
.add(ChainDatabase.tx.insertNewTransaction(tx))
.future()
}

def insertBlockTransaction(bt: BlockTransaction) = {
Batch.logged
.add(ChainDatabase.btx.insertNewBlockTransaction(bt))
.future()
}

object block extends ConcreteBlocksModel with keyspace.Connector

object tx extends ConcreteTransactionsModel with keyspace.Connector

object btx extends ConcreteBlockTransactionsModel with keyspace.Connector


}

object ChainDatabase extends Database(Config.keySpaceDefinition)

Answer

The error is obviously that you are trying to add a Future to a Batch, when a Batch needs a query. If you already triggered a query, it's not possible to batch it anymore, so you need to stop one step ahead. Here's how:

def insertNewRecord(
  bt: BlockTransaction
): InsertQuery.Default[BlockTransactionModel, BlockTransaction] = {
  insert
    .value(_.hash, bt.hash)
    .value(_.txid, bt.txid)
}

Now you can add multiple records to a batch with:

Batch.logged.add(insertNewRecord(record1)
 .add(insertNewRecord(record2))
 // etc

On a different note a batch in Cassandra is not used to do parallel inserts, instead it is used to guarantee atomicity which makes it in general at least 30% slower than a normal parallel insert. Read this for more details.

If you simply want to insert more things at the same time, you can use the method that returns a future like this:

def insertMany(
  list: List[BlockTransaction]
): Future[List[ResultSet]] = {
  Future.sequence(list.map(insertNewRecord(_).future()))
}
Comments