diff --git a/configure.py b/configure.py index b0934fd413..f1a26de589 100755 --- a/configure.py +++ b/configure.py @@ -374,6 +374,7 @@ scylla_core = (['database.cc', 'service/storage_service.cc', 'service/pending_range_calculator_service.cc', 'service/load_broadcaster.cc', + 'service/pager/paging_state.cc', 'streaming/streaming.cc', 'streaming/stream_task.cc', 'streaming/stream_session.cc', diff --git a/service/pager/paging_state.cc b/service/pager/paging_state.cc new file mode 100644 index 0000000000..b0b103475c --- /dev/null +++ b/service/pager/paging_state.cc @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "bytes.hh" +#include "keys.hh" +#include "utils/data_input.hh" +#include "utils/data_output.hh" +#include "db/serializer.hh" +#include "paging_state.hh" + +service::pager::paging_state::paging_state(partition_key pk, clustering_key ck, + uint32_t rem) + : _partition_key(std::move(pk)), _clustering_key(ck), _remaining(rem) { +} + +::shared_ptr service::pager::paging_state::deserialize( + bytes_opt data) { + if (!data) { + return nullptr; + } + + data_input in(*data); + + try { + auto pk = db::serializer::read(in); + auto ck = db::serializer::read(in); + auto rem(in.read()); + + return ::make_shared(std::move(pk), std::move(ck), rem); + } catch (...) { + std::throw_with_nested( + exceptions::protocol_exception( + "Invalid value for the paging state")); + } +} + +bytes_opt service::pager::paging_state::serialize() const { + auto pkv = _partition_key.view(); + auto ckv = _clustering_key.view(); + + db::serializer pks(pkv); + db::serializer cks(ckv); + + auto size = pks.size() + cks.size() + sizeof(uint32_t); + + bytes res{bytes::initialized_later(), size}; + data_output out(res); + + pks.write(out); + cks.write(out); + out.write(_remaining); + + return {res}; +} diff --git a/service/pager/paging_state.hh b/service/pager/paging_state.hh index d975a1a734..205af05c17 100644 --- a/service/pager/paging_state.hh +++ b/service/pager/paging_state.hh @@ -41,80 +41,43 @@ #pragma once -#include "unimplemented.hh" #include "bytes.hh" +#include "keys.hh" namespace service { namespace pager { class paging_state final { -#if 0 - public final ByteBuffer partitionKey; - public final ByteBuffer cellName; - public final int remaining; - - public PagingState(ByteBuffer partitionKey, ByteBuffer cellName, int remaining) - { - this.partitionKey = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey; - this.cellName = cellName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : cellName; - this.remaining = remaining; - } - - public static PagingState deserialize(ByteBuffer bytes) - { - if (bytes == null) - return null; - - try - { - DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes)); - ByteBuffer pk = ByteBufferUtil.readWithShortLength(in); - ByteBuffer cn = ByteBufferUtil.readWithShortLength(in); - int remaining = in.readInt(); - return new PagingState(pk, cn, remaining); - } - catch (IOException e) - { - throw new ProtocolException("Invalid value for the paging state"); - } - } -#endif + partition_key _partition_key; + clustering_key _clustering_key; + uint32_t _remaining; public: + paging_state(partition_key pk, clustering_key ck, uint32_t rem); - bytes_opt serialize() { - fail(unimplemented::cause::PAGING); -#if 0 - try - { - DataOutputBuffer out = new DataOutputBuffer(serializedSize()); - ByteBufferUtil.writeWithShortLength(partitionKey, out); - ByteBufferUtil.writeWithShortLength(cellName, out); - out.writeInt(remaining); - return out.asByteBuffer(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } -#endif + /** + * Last processed key, i.e. where to start from in next paging round + */ + const partition_key& get_partition_key() const { + return _partition_key; + } + /** + * Clustering key in last partition. I.e. first, next, row + */ + const clustering_key& get_clustering_key() const { + return _clustering_key; + } + /** + * Max remaining rows to fetch in total. + * I.e. initial row_limit - #rows returned so far. + */ + uint32_t get_remaining() const { + return _remaining; } -#if 0 - private int serializedSize() - { - return 2 + partitionKey.remaining() - + 2 + cellName.remaining() - + 4; - } - - @Override - public String toString() - { - return String.format("PagingState(key=%s, cellname=%s, remaining=%d", ByteBufferUtil.bytesToHex(partitionKey), ByteBufferUtil.bytesToHex(cellName), remaining); - } -#endif + static ::shared_ptr deserialize(bytes_opt bytes); + bytes_opt serialize() const; }; }