Browse Source

initial commit

Peter Cai 1 year ago
commit
89677b30b0
Signed by: Peter Cai <[email protected]> GPG Key ID: 71F5FB4E4F3FD54F
7 changed files with 1836 additions and 0 deletions
  1. 2
    0
      .gitignore
  2. 1628
    0
      Cargo.lock
  3. 5
    0
      Cargo.toml
  4. 12
    0
      backend/Cargo.toml
  5. 136
    0
      backend/src/app.rs
  6. 26
    0
      backend/src/life.rs
  7. 27
    0
      backend/src/main.rs

+ 2
- 0
.gitignore View File

@@ -0,0 +1,2 @@
1
+data/
2
+target/

+ 1628
- 0
Cargo.lock
File diff suppressed because it is too large
View File


+ 5
- 0
Cargo.toml View File

@@ -0,0 +1,5 @@
1
+[workspace]
2
+
3
+members = [
4
+  "backend",
5
+]

+ 12
- 0
backend/Cargo.toml View File

@@ -0,0 +1,12 @@
1
+[package]
2
+name = "backend"
3
+version = "0.1.0"
4
+authors = ["Peter Cai <[email protected]>"]
5
+
6
+[dependencies]
7
+actix-web = "0.7"
8
+byteorder = "1.2"
9
+futures = "0.1"
10
+sled = "0.15"
11
+tokio = "0.1"
12
+tokio-threadpool = "0.1"

+ 136
- 0
backend/src/app.rs View File

@@ -0,0 +1,136 @@
1
+use byteorder::{ByteOrder, LittleEndian};
2
+use futures::{Canceled, Future};
3
+use futures::future::poll_fn;
4
+use futures::sync::oneshot::channel;
5
+use sled::{ConfigBuilder, Tree, Error as DbError};
6
+use std::sync::Arc;
7
+use tokio_threadpool::{BlockingError, blocking, ThreadPool};
8
+
9
+#[derive(Debug)]
10
+#[allow(dead_code)]
11
+pub enum AngryError<E> {
12
+    BlockingError(BlockingError),
13
+    DbError(DbError<E>),
14
+    Plain(E),
15
+    Nothing
16
+}
17
+
18
+impl<E> ::std::fmt::Display for AngryError<E> {
19
+    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
20
+        write!(f, "TODO")
21
+    }
22
+}
23
+
24
+impl<E> From<BlockingError> for AngryError<E> {
25
+    fn from(err: BlockingError) -> AngryError<E> {
26
+        AngryError::BlockingError(err)
27
+    }
28
+}
29
+
30
+impl<E> From<DbError<E>> for AngryError<E> {
31
+    fn from(err: DbError<E>) -> AngryError<E> {
32
+        AngryError::DbError(err)
33
+    }
34
+}
35
+
36
+impl<E> From<Canceled> for AngryError<E> {
37
+    fn from(_err: Canceled) -> AngryError<E> {
38
+        AngryError::Nothing
39
+    }
40
+}
41
+
42
+macro_rules! flatten_error {
43
+    ($x:expr) => {
44
+        $x.then(|r| {
45
+            match r {
46
+                Err(err) => Err(err.into()),
47
+                Ok(res) => match res {
48
+                    Err(err) => Err(err.into()),
49
+                    Ok(res) => Ok(res)
50
+                }
51
+            }
52
+        })
53
+    }
54
+}
55
+
56
+// Shared state for this application
57
+// Mostly, the database and the thread pool
58
+#[derive(Clone)]
59
+pub struct AngryAppState {
60
+    db: Tree,
61
+    pool: Arc<ThreadPool>
62
+}
63
+
64
+impl AngryAppState {
65
+    pub fn new(db_path: String) -> AngryAppState {
66
+        let config = ConfigBuilder::new()
67
+            .path(db_path)
68
+            .build();
69
+        let db = Tree::start(config).unwrap();
70
+        AngryAppState {
71
+            db,
72
+            pool: Arc::new(ThreadPool::new())
73
+        }
74
+    }
75
+
76
+    pub fn get_db(&self) -> DbExt {
77
+        DbExt {
78
+            db: self.db.clone()
79
+        }
80
+    }
81
+
82
+    // Execute a Future on the thread pool context created in this object
83
+    // and return its results as an identical future
84
+    // This is used for futures that contain `blocking()` invocations
85
+    // and actix-web itself does not run on tokio's threadpool infrastructure
86
+    pub fn spawn_pool<F, I, E>(&self, f: F) -> impl Future<Item = I, Error = AngryError<E>>
87
+        where F: Future<Item = I, Error = AngryError<E>> + Send + 'static,
88
+              I: Send + 'static,
89
+              E: Send + 'static {
90
+        let (tx, rx) = channel();
91
+        self.pool.spawn(
92
+            f.then(move |r| tx.send(r).map_err(|_| ())));
93
+        flatten_error!(rx)
94
+    }
95
+}
96
+
97
+pub struct DbExt {
98
+    db: Tree
99
+}
100
+
101
+// Convenience methods for operation on the sled database
102
+// All of these methods should be run in a ThreadPool context
103
+impl DbExt {
104
+    pub fn get_async<K: Into<Vec<u8>>>(&self, key: K) -> impl Future<Item = Option<Vec<u8>>, Error = AngryError<()>> {
105
+        let db = self.db.clone();
106
+        let k = key.into();
107
+        flatten_error!(poll_fn(move || blocking(|| {
108
+            let res = db.get(&k);
109
+            //db.flush()?;
110
+            res
111
+        })))
112
+    }
113
+
114
+    pub fn set_async<K: Into<Vec<u8>>>(&self, key: K, value: Vec<u8>) -> impl Future<Item = (), Error = AngryError<()>> {
115
+        let db = self.db.clone();
116
+        let k = key.into();
117
+        flatten_error!(poll_fn(move || blocking(|| {
118
+            db.set(k.clone(), value.clone())?;
119
+            // TODO: Maybe we should not flush here?
120
+            // Maybe we should flush it in a separate thread periodically?
121
+            db.flush()
122
+        })))
123
+    }
124
+
125
+    pub fn get_async_u64<K: Into<Vec<u8>>>(&self, key: K) -> impl Future<Item = u64, Error = AngryError<()>> {
126
+        self.get_async(key)
127
+            .map(|r| r.unwrap_or(vec![0u8; 8]))
128
+            .map(|r| LittleEndian::read_u64(&r))
129
+    }
130
+
131
+    pub fn set_async_u64<K: Into<Vec<u8>>>(&self, key: K, value: u64) -> impl Future<Item = (), Error = AngryError<()>> {
132
+        let mut v = vec![0u8; 8];
133
+        LittleEndian::write_u64(&mut v, value);
134
+        self.set_async(key, v)
135
+    }
136
+}

+ 26
- 0
backend/src/life.rs View File

@@ -0,0 +1,26 @@
1
+use actix_web::{http, App, AsyncResponder, Responder, HttpRequest, HttpResponse};
2
+use actix_web::error::ErrorInternalServerError;
3
+use app::AngryAppState;
4
+use futures::Future;
5
+
6
+pub fn setup_routes(app: App<AngryAppState>) -> App<AngryAppState> {
7
+    app.resource("/p/life", |r| r.method(http::Method::POST).f(life_add))
8
+}
9
+
10
+fn life_add(req: &HttpRequest<AngryAppState>) -> impl Responder {
11
+    // TODO: Rate limit per-IP!
12
+    let state = req.state().clone();
13
+    let db = state.get_db();
14
+    state.spawn_pool(db.get_async_u64("life_secs"))
15
+        .and_then(move |a| {
16
+            let new_a = a + 1;
17
+            state.spawn_pool(db.set_async_u64("life_secs", new_a))
18
+                .and_then(move |_| Ok(HttpResponse::Ok().body(format!("{}", new_a))))
19
+        })
20
+        .map_err(|e| {
21
+            // TODO: Properly return the error
22
+            println!("{:?}", e);
23
+            ErrorInternalServerError(e)
24
+        })
25
+        .responder()
26
+}

+ 27
- 0
backend/src/main.rs View File

@@ -0,0 +1,27 @@
1
+extern crate actix_web;
2
+extern crate byteorder;
3
+extern crate futures;
4
+extern crate sled;
5
+extern crate tokio_threadpool;
6
+
7
+#[macro_use]
8
+mod app;
9
+mod life;
10
+
11
+use actix_web::{http, server, App, Path, Responder};
12
+
13
+fn main() {
14
+    let state = app::AngryAppState::new("data".to_owned());
15
+    server::new(move || {
16
+        let mut app = App::with_state(state.clone())
17
+            .route("/test_index.html", http::Method::GET, test_index);
18
+        app = life::setup_routes(app);
19
+        app
20
+    })
21
+    .bind("127.0.0.1:60324").unwrap()
22
+    .run()
23
+}
24
+
25
+fn test_index(_info: Path<()>) -> impl Responder {
26
+    "I'm angry!"
27
+}

Loading…
Cancel
Save