Kaynağa Gözat

Concurrent queue working

Joshua Rutschmann 7 yıl önce
ebeveyn
işleme
9a915eb9d7

+ 4
- 0
hw10/task1/Cargo.toml Dosyayı Görüntüle

@@ -6,5 +6,9 @@ authors = ["Joshua Rutschmann <joshua.rutschmann@gmx.de>", "Lorenz Bung <lorenz.
6 6
 [dependencies]
7 7
 srv-commands = { path = "srv-commands" }
8 8
 srv-config = { path = "srv-config" }
9
+srv-hasher = { path = "srv-hasher" }
10
+serde = "1.0.27"
11
+serde_derive = "1"
12
+serde_json = "1"
9 13
 
10 14
 [workspace]

+ 126
- 46
hw10/task1/src/main.rs Dosyayı Görüntüle

@@ -1,12 +1,61 @@
1 1
 extern crate srv_commands;
2 2
 extern crate srv_config;
3
+extern crate srv_hasher;
4
+
5
+#[macro_use]
6
+extern crate serde_derive;
7
+extern crate serde;
8
+extern crate serde_json;
3 9
 
4 10
 use std::io::{BufReader, BufRead, BufWriter, Write};
5 11
 use std::collections::VecDeque;
6 12
 use std::net::{TcpListener, TcpStream};
7 13
 use srv_commands::Command;
8 14
 use srv_config::Config;
9
-use HashServerError::Parse;
15
+use srv_hasher::Solution;
16
+use std::{process, io, thread};
17
+use std::sync::Arc;
18
+use std::sync::Mutex;
19
+
20
+//#[derive(Serialize, Deserialize)]
21
+#[derive(Clone, Debug)]
22
+struct OrderQueue<T: Send + Copy> {
23
+    inner: Arc<Mutex<VecDeque<T>>>,
24
+}
25
+
26
+impl<T: Send + Copy> OrderQueue<T> {
27
+
28
+    fn new() -> Self {
29
+        Self { inner: Arc::new(Mutex::new(VecDeque::new())) }
30
+    }
31
+
32
+    fn deserialize() {
33
+        
34
+    }
35
+
36
+    fn serialize(&self) {
37
+        let inner_queue = self.inner.lock();
38
+        if let Ok(mut queue) = inner_queue {
39
+        }
40
+    }
41
+    
42
+    fn get_latest_order(&self) -> Option<T> {
43
+        let inner_queue = self.inner.lock();
44
+        if let Ok(mut queue) = inner_queue {
45
+            queue.pop_front()
46
+        } else {
47
+            panic!("MutexError");
48
+        }
49
+    }
50
+
51
+    fn add_order(&self, order: T) {
52
+        if let Ok(mut queue) = self.inner.lock() {
53
+            queue.push_front(order);
54
+        } else {
55
+            panic!("MutexError");
56
+        }
57
+    }
58
+}
10 59
 
11 60
 #[derive(Debug)]
12 61
 pub enum HashServerError {
@@ -26,7 +75,7 @@ impl From<srv_commands::ParseError> for HashServerError {
26 75
     }
27 76
 }
28 77
 
29
-fn handle_client(stream: &TcpStream, orders: &mut VecDeque<String>, v: bool) {
78
+fn handle_client(stream: &TcpStream, orders:OrderQueue<&str>) {
30 79
     let mut reader = BufReader::new(stream);
31 80
     let mut writer = BufWriter::new(stream);
32 81
 
@@ -38,32 +87,19 @@ fn handle_client(stream: &TcpStream, orders: &mut VecDeque<String>, v: bool) {
38 87
                     break;
39 88
                 }
40 89
 
41
-                let cmd = srv_commands::parse(&line).map_err(HashServerError::Parse);
90
+                let mut words = line.split_whitespace();
91
+                match words.next() {
92
+                    Some(difficulty) => {
42 93
 
43
-                match cmd {
44
-                    Ok(Command::Stage(str)) => {
45
-                        orders.push_front(str);
46
-                    }
47
-                    Ok(Command::Retrieve) => {
48
-                        if orders.is_empty() {
49
-                            let _ = writer.write(b"No order on stage!\n");
50
-
51
-                        } else {
52
-                            if let Some(latest_order) = orders.pop_front() {
53
-                                let _ = writer.write(latest_order.as_bytes());
54
-                                let _ = writer.write(b"\n");
55
-                            }
94
+                        if let Some(range) = words.next() {
56 95
                         }
57
-                    }
58
-                    Ok(Command::Control(ref control_string)) => {
59
-                        if v {
60
-                            println!("Received Control: {}", control_string);
96
+
97
+                        if let Some(port) = words.next() {
61 98
                         }
62
-                    }
63
-                    Err(Parse(e)) => {
64
-                        println!("Error occurred: {:?}", e);
65
-                    }
66
-                    _ => {}
99
+                        
100
+                        orders.add_order("Hello");
101
+                    },
102
+                    None => {},
67 103
                 }
68 104
 
69 105
                 let _ = writer.flush();
@@ -76,7 +112,6 @@ fn handle_client(stream: &TcpStream, orders: &mut VecDeque<String>, v: bool) {
76 112
 }
77 113
 
78 114
 pub fn main() {
79
-
80 115
     let c = Config::load();
81 116
 
82 117
     if c.verbosity > 0 {
@@ -92,30 +127,75 @@ pub fn main() {
92 127
 
93 128
     let host = format!("{}:{}", c.address, c.port);
94 129
 
95
-    let mut orders: VecDeque<String> = VecDeque::new();
96
-
97
-    if c.testing {
98
-        orders.push_front(String::from("Test3"));
99
-        orders.push_front(String::from("Test2"));
100
-        orders.push_front(String::from("Test1"));
101
-    }
102
-
103
-    match TcpListener::bind(host).map_err(HashServerError::Io) {
104
-        Ok(listener) => {
105
-            for s in listener.incoming() {
106
-                if let Ok(stream) = s {
107
-                    if c.verbosity > 1 {
108
-                        println!("[DEBUG] New Client connected")
109
-                    }
110
-                    handle_client(&stream, &mut orders, c.verbosity > 0);
111
-                    if c.verbosity > 1 {
112
-                        println!("[DEBUG] Client diconnected")
130
+    let orders: OrderQueue<&str> = OrderQueue::new();
131
+    let mut results: VecDeque<Solution> = VecDeque::new();
132
+
133
+    let ord = orders.clone();
134
+
135
+    thread::spawn(move || {
136
+        match TcpListener::bind(host).map_err(HashServerError::Io) {
137
+            Ok(listener) => {
138
+                for s in listener.incoming() {
139
+                    if let Ok(stream) = s {
140
+                        if c.verbosity > 1 {
141
+                            println!("[DEBUG] New Client connected");
142
+                        }                       
143
+                        
144
+                        let order_queue = orders.clone();
145
+                        
146
+                        thread::spawn(move || {
147
+                            handle_client(&stream, order_queue);
148
+                        });
149
+
150
+                        if c.verbosity > 1 {
151
+                            println!("[DEBUG] Client diconnected")
152
+                        }
113 153
                     }
114 154
                 }
155
+            },
156
+            Err(e) => {
157
+                println!("Failed to start the MultiHashServer: {:?}", e);
115 158
             }
116 159
         }
117
-        Err(e) => {
118
-            println!("Failed to start the MultiHashServer: {:?}", e);
160
+    
161
+    });
162
+
163
+    let stdin = io::stdin();
164
+    let mut sl = stdin.lock();
165
+    let stdout = io::stdout();
166
+    let mut ol = stdout.lock();
167
+    
168
+    loop {
169
+        ol.write(b"HashServer> ");
170
+        ol.flush();
171
+        let mut line = String::new();
172
+        match sl.read_line(&mut line) {
173
+            Ok(_) => {
174
+                let mut lines = line.trim_right().split(' ');
175
+                match lines.next() {
176
+                    Some("exit") => process::exit(0),
177
+                    Some("halt") => {
178
+                        println!("Saving state...");
179
+                        //Serialize here
180
+                        process::exit(0);
181
+                    },
182
+                    Some("get") => {
183
+                        ord.serialize();
184
+                        println!("{:?}", ord);
185
+                    },
186
+                    Some("continue") => {
187
+                        println!("Restoring state")
188
+                    },
189
+                    Some("threads") => {
190
+                        if let Some(thr) = lines.next() {
191
+                            println!("Setting thread count to {}", thr);
192
+                        }
193
+                    },
194
+                    Some(&_) => {},
195
+                    None => {},
196
+                }
197
+            }
198
+            Err(_) => {},
119 199
         }
120 200
     }
121 201
 }

+ 8
- 0
hw10/task1/srv-hasher/Cargo.toml Dosyayı Görüntüle

@@ -0,0 +1,8 @@
1
+[package]
2
+name = "srv-hasher"
3
+version = "0.1.0"
4
+authors = ["Joshua Rutschmann <joshua.rutschmann@gmx.de>"]
5
+
6
+[dependencies]
7
+sha2 = "0.7.0"
8
+time = "0.1"

+ 309
- 0
hw10/task1/srv-hasher/src/lib.rs Dosyayı Görüntüle

@@ -0,0 +1,309 @@
1
+extern crate sha2;
2
+extern crate time;
3
+
4
+#[cfg(feature = "SHA2")]
5
+use self::sha2::Sha256;
6
+use std::thread;
7
+use std::sync::Arc;
8
+use std::sync::atomic::AtomicBool;
9
+use std::sync::atomic::Ordering::Relaxed;
10
+use std::sync::mpsc::{Sender, channel};
11
+use time::{Duration, get_time};
12
+
13
+
14
+pub struct Sha256;
15
+
16
+pub trait Hasher {
17
+    type Output: HashResult;
18
+    fn hash(input: &[u8]) -> Self::Output;
19
+}
20
+
21
+pub trait HashResult {
22
+    /// Get the output in hex notation.
23
+    fn hex(&self) -> String;
24
+    /// Size of the output in bytes.
25
+    fn size() -> usize;
26
+}
27
+
28
+impl Hasher for Sha256 {
29
+    type Output = [u8; 32];
30
+
31
+    fn hash(input: &[u8]) -> Self::Output {
32
+        use self::sha2::*;
33
+        let mut tmp = Sha256::new();
34
+        tmp.input(input);
35
+        let r = tmp.result().as_slice().to_vec();
36
+        [
37
+            r[0],
38
+            r[1],
39
+            r[2],
40
+            r[3],
41
+            r[4],
42
+            r[5],
43
+            r[6],
44
+            r[7],
45
+            r[8],
46
+            r[9],
47
+            r[10],
48
+            r[11],
49
+            r[12],
50
+            r[13],
51
+            r[14],
52
+            r[15],
53
+            r[16],
54
+            r[17],
55
+            r[18],
56
+            r[19],
57
+            r[20],
58
+            r[21],
59
+            r[22],
60
+            r[23],
61
+            r[24],
62
+            r[25],
63
+            r[26],
64
+            r[27],
65
+            r[28],
66
+            r[29],
67
+            r[30],
68
+            r[31],
69
+        ]
70
+    }
71
+}
72
+
73
+impl HashResult for [u8; 32] {
74
+    fn hex(&self) -> String {
75
+        const HEX: [char; 16] = [
76
+            '0',
77
+            '1',
78
+            '2',
79
+            '3',
80
+            '4',
81
+            '5',
82
+            '6',
83
+            '7',
84
+            '8',
85
+            '9',
86
+            'a',
87
+            'b',
88
+            'c',
89
+            'd',
90
+            'e',
91
+            'f',
92
+        ];
93
+        let mut tmp = String::with_capacity(32 * 2);
94
+        for byte in self.iter() {
95
+            tmp.push(HEX[*byte as usize / 16]);
96
+            tmp.push(HEX[*byte as usize % 16]);
97
+        }
98
+        tmp
99
+    }
100
+
101
+    fn size() -> usize {
102
+        32
103
+    }
104
+}
105
+
106
+#[derive(Debug, PartialEq)]
107
+pub struct Solution {
108
+    pub number: usize,
109
+    pub hash: String,
110
+}
111
+
112
+/// Überprüft ob *base* und *number* auf einen Hash abbilden,
113
+/// der mit der übergebenen *difficulty* übereinstimmt.
114
+/// Falls ja, kommt eine `Solution` in der Option mit den Ergebnissen zurück.
115
+/// Falls nein, steht None im Optional
116
+pub fn verify_product(base: usize, number: usize, difficulty: &String) -> Option<Solution> {
117
+    let sol = base * number;
118
+    let input = sol.to_string();
119
+    let bytes = input.as_bytes();
120
+
121
+    let hash = Sha256::hash(bytes).hex();
122
+
123
+    if hash.ends_with(difficulty) {
124
+        return Some(Solution {
125
+            number: number,
126
+            hash: hash,
127
+        });
128
+    }
129
+
130
+    None
131
+}
132
+
133
+/// Sucht nach einem Hash für die angegebene Basis und die Schwierigkeit.
134
+/// Wenn `total` > 1 ist, dann hat jeder Aufruf mit einem anderen `start`-Wert (von 0 - total)
135
+/// eine disjunkte Zahlenmenge für die Suche zur Auswahl.
136
+fn search_hash(
137
+    hash: &String,
138
+    base: usize,
139
+    start: usize,
140
+    total: usize,
141
+    sync: bool,
142
+    found: Arc<AtomicBool>,
143
+    special: usize,
144
+    solution_tx: Sender<Solution>,
145
+    timing_tx: Sender<(Duration, usize)>,
146
+    measure: bool,
147
+) {
148
+    let max = <usize>::max_value();
149
+    let mut n = start;
150
+
151
+    let thread_start = get_time();
152
+    while n < max {
153
+
154
+        // Der special Parameter begrenzt die Anzahl der load()-Aufrufe auf jeden n.ten Loop.
155
+        if n % special == 0 && found.load(Relaxed) {
156
+            // Anderer Thread hat eine Lösung gefunden (sync ist aktiviert). Beende Suche.
157
+            break;
158
+        }
159
+
160
+        if let Some(solution) = verify_product(base, n, hash) {
161
+            if sync {
162
+                found.store(true, Relaxed);
163
+            }
164
+            // Sende gefundene Solution an den Consumer.
165
+            let _ = solution_tx.send(solution);
166
+            // Beende Suche.
167
+            break;
168
+        }
169
+        n += total;
170
+    }
171
+
172
+    // Falls measure aktiviert ist Sende Zeitdauer und Anzahl Loops an den Consumer.
173
+    if measure {
174
+        let thread_end = get_time();
175
+        let _ = timing_tx.send((thread_end - thread_start, (n / total)));
176
+    }
177
+}
178
+
179
+/// Teilt, wenn nötig, die Suche nach dem Hash auf mehrere Threads auf.
180
+/// Gibt ggf. die Solution (für die Tests) zurück.
181
+pub fn search_with_threads(
182
+    threads: usize,
183
+    diff_string: String,
184
+    with_base: usize,
185
+    time_measurement: bool,
186
+    verbosity: u64,
187
+    sync: Option<usize>,
188
+    wait: bool,
189
+) -> Option<Solution> {
190
+
191
+    let diff = Arc::new(diff_string);
192
+    let mut children = vec![];
193
+    let mut solution = None;
194
+
195
+    let (solution_tx, solution_rx) = channel();
196
+    let (timing_tx, timing_rx) = channel();
197
+
198
+    let found = Arc::new(AtomicBool::new(false));
199
+    let m = time_measurement && verbosity > 0;
200
+
201
+    let total_start = get_time();
202
+    if threads > 1 {
203
+
204
+        if verbosity > 0 {
205
+            println!("Searching with {} threads", threads);
206
+        }
207
+
208
+        // Erstellt Anzahl angeforderter Threads.
209
+        // Klont für jeden Thread die Referenz auf die gemeinsamen Variablen.
210
+        for i in 0..threads {
211
+            let diff = diff.clone();
212
+            let solution_tx = solution_tx.clone();
213
+            let timing_tx = timing_tx.clone();
214
+            let found = found.clone();
215
+
216
+            children.push(thread::spawn(move || {
217
+
218
+                // Suche in jedem der Threads.
219
+                search_hash(
220
+                    &diff,
221
+                    with_base,
222
+                    i,
223
+                    threads,
224
+                    sync.is_some(),
225
+                    found,
226
+                    sync.unwrap_or(1),
227
+                    solution_tx,
228
+                    timing_tx,
229
+                    m,
230
+                );
231
+
232
+                if verbosity > 1 {
233
+                    println!("[DEBUG] Thread {} exited", i);
234
+                }
235
+            }));
236
+        }
237
+    } else {
238
+        // Suche auf dem Main-Thread.
239
+        search_hash(
240
+            &diff,
241
+            with_base,
242
+            0,
243
+            1,
244
+            sync.is_some(),
245
+            found,
246
+            sync.unwrap_or(1),
247
+            solution_tx,
248
+            timing_tx,
249
+            m,
250
+        );
251
+
252
+        if verbosity > 1 {
253
+            println!("[DEBUG] Finished search on main thread.");
254
+        }
255
+    }
256
+
257
+    // Empfängt die Lösung von einem der Producer.
258
+    match solution_rx.recv() {
259
+        Ok(sol) => {
260
+            solution = Some(Solution {
261
+                number: sol.number,
262
+                hash: sol.hash.clone(),
263
+            });
264
+            let total_end = get_time();
265
+            println!("Number: {} --> hash: {}", sol.number, sol.hash);
266
+            if time_measurement && verbosity == 0 {
267
+                let diff = total_end - total_start;
268
+                let s = diff.num_seconds();
269
+                let ms = diff.num_milliseconds();
270
+                let us = diff.num_microseconds().unwrap_or(ms * 1000);
271
+                println!("(Duration {}s / {}ms / {}us)", s, ms, us);
272
+            }
273
+        }
274
+        Err(_) => {}
275
+    }
276
+
277
+    let mut sum_loops = 0usize;
278
+    let mut sum_time: Duration = Duration::zero();
279
+
280
+    for child in children {
281
+
282
+        if time_measurement && verbosity > 0 {
283
+            // Warte auf Zeitstatistik-Nachricht von jedem Thread auf dem zweiten MPSC Channel.
284
+            if let Ok(stats) = timing_rx.recv() {
285
+                // Addiere Werte dieses threads zu der Summe.
286
+                sum_time = sum_time + stats.0;
287
+                sum_loops += stats.1;
288
+            }
289
+        }
290
+
291
+        // Falls *wait* wahr ist, warte auf aktuellen thread (child)
292
+        if wait {
293
+            let _ = child.join();
294
+        }
295
+    }
296
+
297
+    // Gebe die Gesamtergebnisse der Threads aus.
298
+    if time_measurement && verbosity > 0 {
299
+        println!("Sum Loops in Producers:       {}", sum_loops);
300
+        let s = sum_time.num_seconds();
301
+        let ms = sum_time.num_milliseconds();
302
+        let us = sum_time.num_microseconds().unwrap_or(ms * 1000);
303
+        println!("Sum Duration in Producers:    {}s / {}ms / {}us", s, ms, us);
304
+    }
305
+
306
+    // Gebe die Option einer Solution zurück.
307
+    solution
308
+}
309
+

Loading…
İptal
Kaydet