cassandra 添加重复记录时,IfNotExists不返回错误

ibps3vxo  于 2022-11-05  发布在  Cassandra
关注(0)|答案(1)|浏览(279)

我的查询在插入时检查记录是否重复

def insertValues(tableName:String, model:User):Insert = {
    QueryBuilder.insertInto(tableName).value("bucket",model.profile.internalProfileDetails.get.bucketId)
....
      .ifNotExists();
  }

我正在保存一个重复的条目,并希望Cassandra返回一个错误。但我却返回了现有的记录。Insert不应该返回一个错误吗?

def save(user:User):Future[Option[User]] = Future {
    saveDataToDatabase(user)
}

def saveDataToDatabase(data:M):Option[M] = { 
    println("inserting in table "+tablename+" with partition key  "+partitionKeyColumns +" and values "+data)

    val insertQuery = insertValues(tablename,data)
    println("insert query is "+insertQuery)
    try {
      val resultSet = session.execute(insertQuery) //execute can take a Statement. Insert is derived from Statement so I can use Insert.
      println("resultset after insert: " + resultSet)
      Some(data)
    }catch {
      case e:Exception => { 
        println("cassandra exception "+e)
        None
      }
    }
  }

表模式为

users (
    bucket int,
    email text,
    authprovider text,
    firstname text,
    lastname text,
    confirmed boolean,
    hasher text,
    id uuid,
    password text,
    salt text,
    PRIMARY KEY ((bucket, email), authprovider, firstname, lastname)

在我的测试用例中,我期望返回值为None,但我得到的是Some(用户)

"UsersRepository" should {
    "not save a new user if the user already exist in the system" in {

     val insertUserStatement =
            s"""
               | INSERT INTO users (bucket,email,firstname,lastname,authprovider,password,confirmed,id,hasher,salt) VALUES
               | (1,'${testEnv.email}','fn','ln','${testEnv.loginInfo.providerID}','somePassword',false,${testEnv.mockHelperMethods.getUniqueID()},'someHasher','someSalt')
            """.stripMargin

     testCassandra.executeScripts(new CqlStatements(insertUserStatement))

      val userKeys = UserKeys(1, testEnv.email ,testEnv.loginInfo, "fn", "ln")

      val cassandraConnectionService = CassandraConnectionManagementService()
      val (cassandraSession,cluster) = cassandraConnectionService.connectWithCassandra()
      cassandraConnectionService.initKeySpace(cassandraSession,"mykeyspace")

      val userRepository = new UsersRepository(testEnv.mockHelperMethods,cassandraSession,"users")

      val resultCheckUser  = await[Option[User]](userRepository.findOne(userKeys))(Timeout(Duration(5000,"millis")))

      val user = User(UUID.fromString("11111111-1111-1111-1111-111111111111"),
        UserProfile(
          Some(InternalUserProfile(LoginInfo("credentials","test@test.com"),1,false,Some(PasswordInfo("someHasher","somePassword",None)))),
          ExternalUserProfile("test@test.com","fn","ln",None)))

      println(s"found initial user result ${resultCheckUser}")
      resultCheckUser mustBe Some(user)

      println(s"user already exists. Will try to add duplicate ")
      println(s"adding user with user ${user}")

      val resultAddUser  = await[Option[User]](userRepository.save(user))(Timeout(Duration(5000,"millis")))

      resultAddUser mustBe None

    }
  }

测试执行的输出

insert query is INSERT INTO users (bucket,email,authprovider,firstname,lastname,confirmed,id,password,hasher,salt) VALUES (1,'test@test.com','credentials','fn','ln',false,11111111-1111-1111-1111-111111111111,'somePassword','someHasher','') IF NOT EXISTS;
[info] c.g.n.e.c.Cassandra - INFO  [Native-Transport-Requests-1] 2019-06-07 06:13:57,659 OutboundTcpConnection.java:108 - OutboundTcpConnection using coalescing strategy DISABLED
[info] c.g.n.e.c.Cassandra - INFO  [HANDSHAKE-localhost/127.0.0.1] 2019-06-07 06:13:57,683 OutboundTcpConnection.java:560 - Handshaking version with localhost/127.0.0.1
resultset after insert: ResultSet[ exhausted: false, Columns[[applied](boolean), bucket(int), email(varchar), authprovider(varchar), firstname(varchar), lastname(varchar), confirmed(boolean), hasher(varchar), id(uuid), password(varchar), salt(varchar)]]
running afterEach statements
afterEach: cassandra state is STARTED
[debug] c.g.n.e.c.t.TestCassandra - Stop TestCassandra 3.11.1

Some(User(11111111-1111-1111-1111-111111111111,UserProfile(Some(InternalUserProfile(LoginInfo(credentials,test@test.com),1,false,Some(PasswordInfo(someHasher,somePassword,None)))),ExternalUserProfile(test@test.com,fn,ln,None)))) was not equal to None
ScalaTestFailureLocation: UnitSpecs.RepositorySpecs.UsersRepositorySpecs at (UsersRepositorySpecs.scala:362)
Expected :None
Actual   :Some(User(11111111-1111-1111-1111-111111111111,UserProfile(Some(InternalUserProfile(LoginInfo(credentials,test@test.com),1,false,Some(PasswordInfo(someHasher,somePassword,None)))),ExternalUserProfile(test@test.com,fn,ln,None))))
kpbwa7wx

kpbwa7wx1#

executeQuery返回具有wasApplied方法的ResultSet。如果完成了insert操作,则该方法返回true,否则返回false。如果记录重复,则wasAppliedfalse

try {
      val resultSet = session.execute(insertQuery) //execute can take a Statement. Insert is derived from Statement so I can use Insert.
      println("resultset after insert: " + resultSet)
      if(resultSet.wasApplied()){
        Some(data)
      } else {
        None
      }
    }catch {
      case e:Exception => {
        println("cassandra exception "+e)
        None
      }
    }

相关问题